This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3fc830a19c Be able to detect and remove orphaned compression 
dictionaries
3fc830a19c is described below

commit 3fc830a19c67b7869398e6929f7be4eb1bd03ae5
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Wed Feb 4 18:49:01 2026 +1100

    Be able to detect and remove orphaned compression dictionaries
    
    patch by Stefan Miklosovic; reviewed by Yifan Cai for CASSANDRA-21157
---
 CHANGES.txt                                        |   1 +
 .../pages/managing/operating/compression.adoc      |   2 +
 .../db/compression/CompressionDictionary.java      |  18 +++
 .../CompressionDictionaryDetailsTabularData.java   |  28 +++-
 .../CompressionDictionaryEventHandler.java         |   2 +-
 .../compression/CompressionDictionaryManager.java  |  18 ++-
 .../CompressionDictionaryScheduler.java            |   5 +-
 .../schema/SystemDistributedKeyspace.java          | 116 +++++++++++---
 .../apache/cassandra/service/StorageService.java   |  24 +++
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |  14 ++
 .../CompressionDictionaryCommandGroup.java         | 130 +++++++++++----
 test/resources/nodetool/help/compressiondictionary |  12 ++
 .../nodetool/help/compressiondictionary$cleanup    |  34 ++++
 .../CompressionDictionaryDataObjectTest.java       |  18 ++-
 .../CompressionDictionaryOrphanedTest.java         | 177 +++++++++++++++++++++
 .../CompressionDictionarySchedulerTest.java        |   8 +-
 ...stributedKeyspaceCompressionDictionaryTest.java |  42 ++---
 .../ExportImportListCompressionDictionaryTest.java |   4 +-
 19 files changed, 569 insertions(+), 88 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a56f0b8df9..f94cd94b4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Be able to detect and remove orphaned compression dictionaries 
(CASSANDRA-21157)
  * Fix BigTableVerifier to only read a data file during extended verification 
(CASSANDRA-21150) 
  * Reduce memory allocation during transformation of BatchStatement to 
Mutation (CASSANDRA-21141)
  * Direct I/O support for compaction reads (CASSANDRA-19987)
diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc 
b/doc/modules/cassandra/pages/managing/operating/compression.adoc
index a5a06c4e00..97de5c12c0 100644
--- a/doc/modules/cassandra/pages/managing/operating/compression.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc
@@ -403,6 +403,8 @@ There are these four commands for now related to 
compression dictionaries:
 * export - a user can export a compression dictionary of a keyspace and a 
table, either the last one or
 by a specific id, to a file.
 * import - a user can import a compression dictionary, exported by above 
command, from a file to a cluster.
+* cleanup - removes orphaned dictionaries. These are the ones for which a 
table they were trained for does not exist anymore (it was dropped).
+A user can inspect which dictionaries are orphaned by `-d / --dry` flag 
without deleting them.
 
 For `train` subcommand, it is possible to specify:
 
diff --git 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
index bbacacd85c..30f37a0b55 100644
--- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
+++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
@@ -261,11 +261,13 @@ public interface CompressionDictionary
         int size = row.getInt("dict_length");
         String keyspaceName = row.getString("keyspace_name");
         String tableName = row.getString("table_name");
+        String tableId = row.getString("table_id");
 
         try
         {
             return new LightweightCompressionDictionary(keyspaceName,
                                                         tableName,
+                                                        tableId,
                                                         new 
DictId(CompressionDictionary.Kind.valueOf(kindStr), dictId),
                                                         checksum,
                                                         size);
@@ -430,21 +432,37 @@ public interface CompressionDictionary
     {
         public final String keyspaceName;
         public final String tableName;
+        public final String tableId;
         public final DictId dictId;
         public final int checksum;
         public final int size;
 
         public LightweightCompressionDictionary(String keyspaceName,
                                                 String tableName,
+                                                String tableId,
                                                 DictId dictId,
                                                 int checksum,
                                                 int size)
         {
             this.keyspaceName = keyspaceName;
             this.tableName = tableName;
+            this.tableId = tableId;
             this.dictId = dictId;
             this.checksum = checksum;
             this.size = size;
         }
+
+        @Override
+        public String toString()
+        {
+            return "LightweightCompressionDictionary{" +
+                   "keyspaceName='" + keyspaceName + '\'' +
+                   ", tableName='" + tableName + '\'' +
+                   ", tableId='" + tableId + '\'' +
+                   ", dictId=" + dictId +
+                   ", checksum=" + checksum +
+                   ", size=" + size +
+                   '}';
+        }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
index 3802c10aba..588bb418d3 100644
--- 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
+++ 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
@@ -39,16 +39,25 @@ import static java.lang.String.format;
 
 public class CompressionDictionaryDetailsTabularData
 {
+    /**
+     * Position inside index names of tabular type of tabular data returned 
upon
+     * listing dictionaries where table id is expected to be located.
+     * We do not need to process this entry at all time, e.g. when not listing
+     * orphaned compression dictionaries.
+     */
+    public static final int TABULAR_DATA_TYPE_TABLE_ID_INDEX = 2;
+
     /**
      * Position inside index names of tabular type of tabular data returned 
upon
      * listing dictionaries where raw dictionary is expected to be located.
      * We do not need to process this entry as listing does not contain any 
raw dictionary,
      * only exporting does.
      */
-    public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3;
+    public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 4;
 
     public static final String KEYSPACE_NAME = "Keyspace";
     public static final String TABLE_NAME = "Table";
+    public static final String TABLE_ID_NAME = "TableId";
     public static final String DICT_ID_NAME = "DictId";
     public static final String DICT_NAME = "Dict";
     public static final String KIND_NAME = "Kind";
@@ -58,6 +67,7 @@ public class CompressionDictionaryDetailsTabularData
 
     private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME,
                                                              TABLE_NAME,
+                                                             TABLE_ID_NAME,
                                                              DICT_ID_NAME,
                                                              DICT_NAME,
                                                              KIND_NAME,
@@ -66,6 +76,7 @@ public class CompressionDictionaryDetailsTabularData
 
     private static final String[] ITEM_DESCS = new String[]{ "keyspace",
                                                              "table",
+                                                             "table_id",
                                                              "dictionary_id",
                                                              
"dictionary_bytes",
                                                              "kind",
@@ -84,6 +95,7 @@ public class CompressionDictionaryDetailsTabularData
         {
             ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace
                                          SimpleType.STRING, // table
+                                         SimpleType.STRING, // tableId
                                          SimpleType.LONG, // dict id
                                          new 
ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes
                                          SimpleType.STRING, // kind
@@ -115,6 +127,7 @@ public class CompressionDictionaryDetailsTabularData
                                             {
                                             dictionary.keyspaceName,
                                             dictionary.tableName,
+                                            dictionary.tableId,
                                             dictionary.dictId.id,
                                             null, // on purpose not returning 
actual dictionary
                                             dictionary.dictId.kind.name(),
@@ -133,10 +146,11 @@ public class CompressionDictionaryDetailsTabularData
      *
      * @param keyspace   keyspace of a dictionary
      * @param table      table of a dictionary
+     * @param tableId    id of a table dictionary is for
      * @param dictionary dictionary itself
      * @return composite data representing dictionary
      */
-    public static CompositeData fromCompressionDictionary(String keyspace, 
String table, CompressionDictionary dictionary)
+    public static CompositeData fromCompressionDictionary(String keyspace, 
String table, String tableId, CompressionDictionary dictionary)
     {
         try
         {
@@ -146,6 +160,7 @@ public class CompressionDictionaryDetailsTabularData
                                             {
                                             keyspace,
                                             table,
+                                            tableId,
                                             dictionary.dictId().id,
                                             dictionary.rawDictionary(),
                                             dictionary.kind().name(),
@@ -176,6 +191,7 @@ public class CompressionDictionaryDetailsTabularData
                                             {
                                             dataObject.keyspace,
                                             dataObject.table,
+                                            dataObject.tableId,
                                             dataObject.dictId,
                                             dataObject.dict,
                                             dataObject.kind,
@@ -200,6 +216,7 @@ public class CompressionDictionaryDetailsTabularData
     {
         return new CompressionDictionaryDataObject((String) 
compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME),
                                                    (String) 
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME),
+                                                   (String) 
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_ID_NAME),
                                                    (Long) 
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME),
                                                    (byte[]) 
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME),
                                                    (String) 
compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME),
@@ -211,6 +228,7 @@ public class CompressionDictionaryDetailsTabularData
     {
         public final String keyspace;
         public final String table;
+        public final String tableId;
         public final long dictId;
         public final byte[] dict;
         public final String kind;
@@ -220,6 +238,7 @@ public class CompressionDictionaryDetailsTabularData
         @JsonCreator
         public CompressionDictionaryDataObject(@JsonProperty("keyspace") 
String keyspace,
                                                @JsonProperty("table") String 
table,
+                                               @JsonProperty("tableId") String 
tableId,
                                                @JsonProperty("dictId") long 
dictId,
                                                @JsonProperty("dict") byte[] 
dict,
                                                @JsonProperty("kind") String 
kind,
@@ -228,6 +247,7 @@ public class CompressionDictionaryDetailsTabularData
         {
             this.keyspace = keyspace;
             this.table = table;
+            this.tableId = tableId;
             this.dictId = dictId;
             this.dict = dict;
             this.kind = kind;
@@ -241,7 +261,7 @@ public class CompressionDictionaryDetailsTabularData
          * An object of this class is considered to be valid if:
          *
          * <ul>
-         *     <li>keyspace and table are not null</li>
+         *     <li>keyspace, table and table id are not null</li>
          *     <li>dict id is lower than 0</li>
          *     <li>dict is not null nor empty</li>
          *     <li>dict length is less than or equal to 1MiB</li>
@@ -257,6 +277,8 @@ public class CompressionDictionaryDetailsTabularData
                 throw new IllegalArgumentException("Keyspace not specified.");
             if (table == null)
                 throw new IllegalArgumentException("Table not specified.");
+            if (tableId == null)
+                throw new IllegalArgumentException("Table id not specified");
             if (dictId <= 0)
                 throw new IllegalArgumentException("Provided dictionary id 
must be positive but it is '" + dictId + "'.");
             if (dict == null || dict.length == 0)
diff --git 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
index c2b0a32a76..ed4d270d0c 100644
--- 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
+++ 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
@@ -87,7 +87,7 @@ public class CompressionDictionaryEventHandler implements 
ICompressionDictionary
                     return;
                 }
 
-                CompressionDictionary dictionary = 
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, 
tableName, dictionaryId.id);
+                CompressionDictionary dictionary = 
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, 
tableName, cfs.metadata().id.toLongString(), dictionaryId.id);
                 cache.add(dictionary);
             }
             catch (Exception e)
diff --git 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
index 2e05fb9681..8adefad01d 100644
--- 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
+++ 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
@@ -57,6 +57,7 @@ public class CompressionDictionaryManager implements 
CompressionDictionaryManage
 
     private final String keyspaceName;
     private final String tableName;
+    private final String tableId;
     private final ColumnFamilyStore columnFamilyStore;
     private volatile boolean mbeanRegistered;
     private volatile boolean isEnabled;
@@ -71,12 +72,13 @@ public class CompressionDictionaryManager implements 
CompressionDictionaryManage
     {
         this.keyspaceName = columnFamilyStore.keyspace.getName();
         this.tableName = columnFamilyStore.getTableName();
+        this.tableId = columnFamilyStore.metadata().id.toLongString();
         this.columnFamilyStore = columnFamilyStore;
 
         this.isEnabled = 
columnFamilyStore.metadata().params.compression.isDictionaryCompressionEnabled();
         this.cache = new CompressionDictionaryCache();
         this.eventHandler = new 
CompressionDictionaryEventHandler(columnFamilyStore, cache);
-        this.scheduler = new CompressionDictionaryScheduler(keyspaceName, 
tableName, cache, isEnabled);
+        this.scheduler = new CompressionDictionaryScheduler(keyspaceName, 
tableName, tableId, cache, isEnabled);
         if (isEnabled)
         {
             // Initialize components
@@ -253,7 +255,7 @@ public class CompressionDictionaryManager implements 
CompressionDictionaryManage
     @Override
     public TabularData listCompressionDictionaries()
     {
-        List<LightweightCompressionDictionary> dictionaries = 
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries(keyspaceName,
 tableName);
+        List<LightweightCompressionDictionary> dictionaries = 
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries(keyspaceName,
 tableName, tableId);
         TabularDataSupport tableData = new 
TabularDataSupport(CompressionDictionaryDetailsTabularData.TABULAR_TYPE);
 
         if (dictionaries == null)
@@ -272,21 +274,21 @@ public class CompressionDictionaryManager implements 
CompressionDictionaryManage
     @Override
     public CompositeData getCompressionDictionary()
     {
-        CompressionDictionary compressionDictionary = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName, 
tableName);
+        CompressionDictionary compressionDictionary = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName, 
tableName, tableId);
         if (compressionDictionary == null)
             return null;
 
-        return 
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName, 
tableName, compressionDictionary);
+        return 
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName, 
tableName, tableId, compressionDictionary);
     }
 
     @Override
     public CompositeData getCompressionDictionary(long dictId)
     {
-        CompressionDictionary compressionDictionary = 
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, 
tableName, dictId);
+        CompressionDictionary compressionDictionary = 
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, 
tableName, tableId, dictId);
         if (compressionDictionary == null)
             return null;
 
-        return 
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName, 
tableName, compressionDictionary);
+        return 
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName, 
tableName, tableId, compressionDictionary);
     }
 
     @Override
@@ -316,7 +318,7 @@ public class CompressionDictionaryManager implements 
CompressionDictionaryManage
 
         CompressionDictionary.DictId dictId = new 
CompressionDictionary.DictId(kind, dataObject.dictId);
 
-        LightweightCompressionDictionary latestCompressionDictionary = 
SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName,
 tableName);
+        LightweightCompressionDictionary latestCompressionDictionary = 
SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName,
 tableName, tableId);
         if (latestCompressionDictionary != null && 
latestCompressionDictionary.dictId.id > dictId.id)
         {
             throw new IllegalArgumentException(format("Dictionary to import 
has older dictionary id (%s) than the latest compression dictionary (%s) for 
table %s.%s",
@@ -420,7 +422,7 @@ public class CompressionDictionaryManager implements 
CompressionDictionaryManage
             return;
         }
 
-        SystemDistributedKeyspace.storeCompressionDictionary(keyspaceName, 
tableName, dictionary);
+        SystemDistributedKeyspace.storeCompressionDictionary(keyspaceName, 
tableName, tableId, dictionary);
         cache.add(dictionary);
     }
 
diff --git 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
index e757364711..c6f5b6a922 100644
--- 
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
+++ 
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
@@ -51,6 +51,7 @@ public class CompressionDictionaryScheduler implements 
ICompressionDictionarySch
 
     private final String keyspaceName;
     private final String tableName;
+    private final String tableId;
     private final ICompressionDictionaryCache cache;
     private final AtomicBoolean manualTrainingInProgress = new 
AtomicBoolean(false);
 
@@ -59,11 +60,13 @@ public class CompressionDictionaryScheduler implements 
ICompressionDictionarySch
 
     public CompressionDictionaryScheduler(String keyspaceName,
                                           String tableName,
+                                          String tableId,
                                           ICompressionDictionaryCache cache,
                                           boolean isEnabled)
     {
         this.keyspaceName = keyspaceName;
         this.tableName = tableName;
+        this.tableId = tableId;
         this.cache = cache;
         this.isEnabled = isEnabled;
     }
@@ -135,7 +138,7 @@ public class CompressionDictionaryScheduler implements 
ICompressionDictionarySch
                 return;
             }
 
-            CompressionDictionary dictionary = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName, 
tableName);
+            CompressionDictionary dictionary = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName, 
tableName, tableId);
             cache.add(dictionary);
         }
         catch (Exception e)
diff --git 
a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index 04ccb54345..31141b6852 100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.CommonRange;
 import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.TimeUUID;
 
@@ -199,13 +200,14 @@ public final class SystemDistributedKeyspace
     public static final String COMPRESSION_DICTIONARIES_CQL = "CREATE TABLE IF 
NOT EXISTS %s (" +
                                                               "keyspace_name 
text," +
                                                               "table_name 
text," +
+                                                              "table_id text," 
+
                                                               "kind text," +
                                                               "dict_id 
bigint," +
                                                               "dict blob," +
                                                               "dict_length 
int," +
                                                               "dict_checksum 
int," +
-                                                              "PRIMARY KEY 
((keyspace_name, table_name), dict_id)) " +
-                                                              "WITH CLUSTERING 
ORDER BY (dict_id DESC)"; // in order to retrieve the latest dictionary; the 
contract is the newer the dictionary the larger the dict_id
+                                                              "PRIMARY KEY 
((keyspace_name, table_name), table_id, dict_id)) " +
+                                                              "WITH CLUSTERING 
ORDER BY (table_id DESC, dict_id DESC)"; // in order to retrieve the latest 
dictionary; the contract is the newer the dictionary the larger the dict_id
 
     private static final TableMetadata CompressionDictionariesTable =
         parse(COMPRESSION_DICTIONARIES, "Compression dictionaries for 
applicable tables", COMPRESSION_DICTIONARIES_CQL).build();
@@ -415,17 +417,22 @@ public final class SystemDistributedKeyspace
      *
      * @param keyspaceName the keyspace name to associate with the dictionary
      * @param tableName the table name to associate with the dictionary
+     * @param tableId the unique id of a table to associate with the dictionary
      * @param dictionary the compression dictionary to store
      */
-    public static void storeCompressionDictionary(String keyspaceName, String 
tableName, CompressionDictionary dictionary)
+    public static void storeCompressionDictionary(String keyspaceName,
+                                                  String tableName,
+                                                  String tableId,
+                                                  CompressionDictionary 
dictionary)
     {
         byte[] dict = dictionary.rawDictionary();
-        String query = "INSERT INTO %s.%s (keyspace_name, table_name, kind, 
dict_id, dict, dict_length, dict_checksum) VALUES ('%s', '%s', '%s', %s, ?, %s, 
%s)";
+        String query = "INSERT INTO %s.%s (keyspace_name, table_name, 
table_id, kind, dict_id, dict, dict_length, dict_checksum) VALUES ('%s', '%s', 
'%s', '%s', %s, ?, %s, %s)";
         String fmtQuery = format(query,
                                  SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
                                  COMPRESSION_DICTIONARIES,
                                  keyspaceName,
                                  tableName,
+                                 tableId,
                                  dictionary.kind(),
                                  dictionary.dictId().id,
                                  dict.length,
@@ -440,14 +447,15 @@ public final class SystemDistributedKeyspace
      *
      * @param keyspaceName the keyspace name to retrieve the dictionary for
      * @param tableName the table name to retrieve the dictionary for
+     * @param tableId the id of the table to retrieve the dictionary for
      * @return the latest compression dictionary for the specified keyspace 
and table,
      *         or null if no dictionary exists or if an error occurs during 
retrieval
      */
     @Nullable
-    public static CompressionDictionary 
retrieveLatestCompressionDictionary(String keyspaceName, String tableName)
+    public static CompressionDictionary 
retrieveLatestCompressionDictionary(String keyspaceName, String tableName, 
String tableId)
     {
-        String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum 
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' LIMIT 1";
-        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName);
+        String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum 
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND table_id='%s' LIMIT 
1";
+        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName, tableId);
         try
         {
             return 
CompressionDictionary.createFromRow(QueryProcessor.execute(fmtQuery, 
ConsistencyLevel.ONE).one());
@@ -464,14 +472,15 @@ public final class SystemDistributedKeyspace
      *
      * @param keyspaceName the keyspace name to retrieve the dictionary for
      * @param tableName the table name to retrieve the dictionary for
+     * @param tableId the table id to retrieve the dictionary for
      * @return the latest compression dictionary for the specified keyspace 
and table,
      *         or null if no dictionary exists or if an error occurs during 
retrieval
      */
     @Nullable
-    public static LightweightCompressionDictionary 
retrieveLightweightLatestCompressionDictionary(String keyspaceName, String 
tableName)
+    public static LightweightCompressionDictionary 
retrieveLightweightLatestCompressionDictionary(String keyspaceName, String 
tableName, String tableId)
     {
-        String query = "SELECT keyspace_name, table_name, kind, dict_id, 
dict_checksum, dict_length FROM %s.%s WHERE keyspace_name='%s' AND 
table_name='%s' LIMIT 1";
-        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName);
+        String query = "SELECT keyspace_name, table_name, table_id, kind, 
dict_id, dict_checksum, dict_length FROM %s.%s WHERE keyspace_name='%s' AND 
table_name='%s' AND table_id='%s' LIMIT 1";
+        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName, tableId);
         try
         {
             return 
CompressionDictionary.createFromRowLightweight(QueryProcessor.execute(fmtQuery, 
ConsistencyLevel.ONE).one());
@@ -488,15 +497,16 @@ public final class SystemDistributedKeyspace
      *
      * @param keyspaceName the keyspace name to retrieve the dictionary for
      * @param tableName the table name to retrieve the dictionary for
+     * @param tableId the table id to retrieve the dictionary for
      * @param dictionaryId the dictionary id to retrieve the dictionary for
      * @return the compression dictionary identified by the specified 
keyspace, table and dictionaryId,
      *         or null if no dictionary exists or if an error occurs during 
retrieval
      */
     @Nullable
-    public static CompressionDictionary retrieveCompressionDictionary(String 
keyspaceName, String tableName, long dictionaryId)
+    public static CompressionDictionary retrieveCompressionDictionary(String 
keyspaceName, String tableName, String tableId, long dictionaryId)
     {
-        String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum 
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND dict_id=%s";
-        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName, dictionaryId);
+        String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum 
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND table_id='%s' AND 
dict_id=%s";
+        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName, tableId, dictionaryId);
         try
         {
             return 
CompressionDictionary.createFromRow(QueryProcessor.execute(fmtQuery, 
ConsistencyLevel.ONE).one());
@@ -512,17 +522,87 @@ public final class SystemDistributedKeyspace
      *
      * @param keyspaceName the keyspace name to retrieve the dictionary for
      * @param tableName the table name to retrieve the dictionary for
+     * @param tableId the table id to retrieve the dictionary for
      * @return the compression dictionaries identified by the specified 
keyspace and table,
-     *         or null if no dictionary exists or if an error occurs during 
retrieval
+     *         empty list if no dictionary exists or null if an error occurs 
during retrieval
+     */
+    @Nullable
+    public static List<LightweightCompressionDictionary> 
retrieveLightweightCompressionDictionaries(String keyspaceName, String 
tableName, String tableId)
+    {
+        String query = "SELECT keyspace_name, table_name, table_id, kind, 
dict_id, dict_length, dict_checksum FROM %s.%s WHERE keyspace_name='%s' AND 
table_name='%s' AND table_id='%s'";
+        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName, tableId);
+        return retrieveLightweightCompressionDictionariesInternal(fmtQuery);
+    }
+
+    /**
+     * Retrieves all compression dictionaries in a lightweight form.
+     *
+     * @return all compression dictionaries, lightweight form, empty list if 
no dictionary exists
+     *         or null if an error occurs during retrieval
      */
     @Nullable
-    public static List<LightweightCompressionDictionary> 
retrieveLightweightCompressionDictionaries(String keyspaceName, String 
tableName)
+    public static List<LightweightCompressionDictionary> 
retrieveLightweightCompressionDictionaries()
+    {
+        String query = "SELECT keyspace_name, table_name, table_id, kind, 
dict_id, dict_length, dict_checksum FROM %s.%s";
+        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES);
+        return retrieveLightweightCompressionDictionariesInternal(fmtQuery);
+    }
+
+    /**
+     * Retrieves all orphaned compression dictionaries in a lightweight form.
+     *
+     * @return all orphaned compression dictionaries, lightweight form, empty 
list if no dictionary exists
+     *         or null if an error occurs during retrieval
+     */
+    public static List<LightweightCompressionDictionary> 
retrieveOrphanedLightweightCompressionDictionaries()
+    {
+        List<LightweightCompressionDictionary> dicts = 
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries();
+        if (dicts == null || dicts.isEmpty())
+            return List.of();
+
+        List<LightweightCompressionDictionary> orphaned = new ArrayList<>();
+        for (LightweightCompressionDictionary dict : dicts)
+        {
+            TableMetadata tableMetadata = 
ClusterMetadata.current().schema.getTableMetadata(dict.keyspaceName, 
dict.tableName);
+            if (tableMetadata == null || 
!tableMetadata.id.toLongString().equals(dict.tableId))
+                orphaned.add(dict);
+        }
+
+        return orphaned;
+    }
+
+    /**
+     * Removes all orphaned compression dictionaries.
+     */
+    public static void clearOrphanedCompressionDictionaries()
+    {
+        for (LightweightCompressionDictionary orphanedDict : 
SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries())
+        {
+            try
+            {
+                QueryProcessor.execute(String.format("DELETE FROM %s.%s WHERE 
keyspace_name='%s' AND table_name='%s' AND table_id='%s' AND dict_id=%s",
+                                                     
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
+                                                     
SystemDistributedKeyspace.COMPRESSION_DICTIONARIES,
+                                                     orphanedDict.keyspaceName,
+                                                     orphanedDict.tableName,
+                                                     orphanedDict.tableId,
+                                                     orphanedDict.dictId.id),
+                                       ConsistencyLevel.ONE);
+            }
+            catch (Exception e)
+            {
+                logger.error("Unable to delete orphaned compression 
dictionary: {}, Reason: {}",
+                             orphanedDict.toString(),
+                             e.getMessage());
+            }
+        }
+    }
+
+    private static List<LightweightCompressionDictionary> 
retrieveLightweightCompressionDictionariesInternal(String query)
     {
-        String query = "SELECT keyspace_name, table_name, kind, dict_id, 
dict_length, dict_checksum FROM %s.%s WHERE keyspace_name='%s' AND 
table_name='%s'";
-        String fmtQuery = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, 
keyspaceName, tableName);
         try
         {
-            UntypedResultSet result = QueryProcessor.execute(fmtQuery, 
ConsistencyLevel.ONE);
+            UntypedResultSet result = QueryProcessor.execute(query, 
ConsistencyLevel.ONE);
             if (result.isEmpty())
                 return Collections.emptyList();
             List<LightweightCompressionDictionary> dictionaries = new 
ArrayList<>();
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index dc9d9e3f68..c9c8f4ed10 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -60,6 +60,7 @@ import javax.management.NotificationListener;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
 
 import com.codahale.metrics.Meter;
 import com.google.common.annotations.VisibleForTesting;
@@ -105,6 +106,8 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
+import 
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import 
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.BootStrapper;
@@ -5782,4 +5785,25 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
         return sstablesTouched;
     }
+
+    @Override
+    public TabularData getOrphanedCompressionDictionaries()
+    {
+        List<LightweightCompressionDictionary> dicts = 
SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries();
+        TabularDataSupport tabularData = new 
TabularDataSupport(CompressionDictionaryDetailsTabularData.TABULAR_TYPE);
+
+        if (dicts.isEmpty())
+            return tabularData;
+
+        for (LightweightCompressionDictionary dict : dicts)
+            
tabularData.put(CompressionDictionaryDetailsTabularData.fromLightweightCompressionDictionary(dict));
+
+        return tabularData;
+    }
+
+    @Override
+    public void clearOrphanedCompressionDictionaries()
+    {
+        SystemDistributedKeyspace.clearOrphanedCompressionDictionaries();
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index be54b4fc76..da03f51a47 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1400,4 +1400,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
 
     /** Mutates the repaired state of all SSTables for the given SSTables */
     public List<String> mutateSSTableRepairedState(boolean repaired, boolean 
preview, String keyspace, List<String> tables);
+
+    TabularData getOrphanedCompressionDictionaries();
+
+    void clearOrphanedCompressionDictionaries();
 }
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 52ed11509b..567df09283 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -2819,6 +2819,20 @@ public class NodeProbe implements AutoCloseable
         return TrainingState.fromCompositeData(compositeData);
     }
 
+    /**
+     * Gets all compression dictionaries which are orphaned, that is a table 
they were trained for was dropped.
+     * @return tabular data of orphaned dictionaries
+     */
+    public TabularData getOrphanedCompressionDictionaries()
+    {
+        return ssProxy.getOrphanedCompressionDictionaries();
+    }
+
+    public void clearOrphanedCompressionDictionaries()
+    {
+        ssProxy.clearOrphanedCompressionDictionaries();
+    }
+
     private CompressionDictionaryManagerMBean getDictionaryManagerProxy(String 
keyspace, String table) throws IOException
     {
         // Construct table-specific MBean name
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
 
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
index 757260f66e..22a3affcbf 100644
--- 
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
+++ 
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
@@ -37,6 +38,7 @@ import 
org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.Trainin
 import org.apache.cassandra.db.compression.TrainingState;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
 import org.apache.cassandra.utils.Clock;
@@ -61,7 +63,8 @@ import static 
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MA
          subcommands = { 
CompressionDictionaryCommandGroup.TrainDictionary.class,
                          
CompressionDictionaryCommandGroup.ListDictionaries.class,
                          
CompressionDictionaryCommandGroup.ExportDictionary.class,
-                         
CompressionDictionaryCommandGroup.ImportDictionary.class })
+                         
CompressionDictionaryCommandGroup.ImportDictionary.class,
+                         
CompressionDictionaryCommandGroup.CleanupDictionaries.class})
 public class CompressionDictionaryCommandGroup
 {
     @Command(name = "train",
@@ -216,40 +219,43 @@ public class CompressionDictionaryCommandGroup
         @Override
         protected void execute(NodeProbe probe)
         {
-            try
-            {
-                TableBuilder tableBuilder = new TableBuilder();
-                TabularData tabularData = 
probe.listCompressionDictionaries(keyspace, table);
-                List<String> indexNames = 
tabularData.getTabularType().getIndexNames();
-
-                List<String> columns = new ArrayList<>(indexNames);
-                // ignore raw dict
-                
columns.remove(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX);
-                tableBuilder.add(columns);
-
-                for (Object eachDict : tabularData.keySet())
-                {
-                    final List<?> dictRow = (List<?>) eachDict;
-
-                    List<String> rowValues = new ArrayList<>();
-
-                    for (int i = 0; i < dictRow.size(); i++)
-                    {
-                        // ignore raw dict
-                        if (i == 
CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX)
-                            continue;
+            ListingHelper.list(probe,
+                               () ->
+                               {
+                                   try
+                                   {
+                                       return 
probe.listCompressionDictionaries(keyspace, table);
+                                   }
+                                   catch (Throwable t)
+                                   {
+                                       throw new RuntimeException(t);
+                                   }
+                               },
+                               
List.of(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_TABLE_ID_INDEX,
+                                       
CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX));
+        }
+    }
 
-                        rowValues.add(dictRow.get(i).toString());
-                    }
-                    tableBuilder.add(rowValues);
-                }
+    @Command(name = "cleanup", description = "Clean up orphaned dictionaries 
by deleting them from " + SystemDistributedKeyspace.NAME
+                                             + '.' + 
SystemDistributedKeyspace.COMPRESSION_DICTIONARIES +
+                                             " table, these are ones for which 
a table they were trained for was dropped.")
+    public static class CleanupDictionaries extends AbstractCommand
+    {
+        @Option(names = {"-d", "--dry"}, description = "Only display orphaned 
dictionaries, do not remove them.")
+        private boolean dry;
 
-                tableBuilder.printTo(probe.output().out);
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            if (dry)
+            {
+                ListingHelper.list(probe,
+                                   probe::getOrphanedCompressionDictionaries,
+                                   
List.of(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX));
             }
-            catch (Exception e)
+            else
             {
-                probe.output().err.printf("Failed to list dictionaries: %s%n", 
e.getMessage());
-                System.exit(1);
+                probe.clearOrphanedCompressionDictionaries();
             }
         }
     }
@@ -368,4 +374,66 @@ public class CompressionDictionaryCommandGroup
             }
         }
     }
+
+    private static class ListingHelper
+    {
+        /**
+         * Lists dictionaries, the concrete mean of querying them is delegated 
to a specific subcommand.
+         *
+         * @param probe probe to query Cassandra with
+         * @param tabularDataSupplier supplier of tabular data containing 
dictionaries to display
+         * @param removedColumnsIndices supplier of an array with indexes of 
columns in tabular data to be ignored
+         *                              when displaying them
+         */
+        public static void list(NodeProbe probe,
+                                Supplier<TabularData> tabularDataSupplier,
+                                List<Integer> removedColumnsIndices)
+        {
+            try
+            {
+                TableBuilder tableBuilder = new TableBuilder();
+                TabularData tabularData = tabularDataSupplier.get();
+                List<String> indexNames = 
tabularData.getTabularType().getIndexNames();
+
+                // ignore columns not meant to be displayed to a user
+                List<String> columns = new ArrayList<>();
+                for (int i = 0; i < indexNames.size(); i++)
+                {
+                    if (!removedColumnsIndices.contains(i))
+                    {
+                        columns.add(indexNames.get(i));
+                    }
+                }
+
+                tableBuilder.add(columns);
+
+                boolean hasOuput = false;
+                for (Object eachDict : tabularData.keySet())
+                {
+                    final List<?> dictRow = (List<?>) eachDict;
+
+                    List<String> rowValues = new ArrayList<>();
+
+                    for (int i = 0; i < dictRow.size(); i++)
+                    {
+                        // ignore columns not meant to be displayed to a user
+                        if (removedColumnsIndices.contains(i))
+                            continue;
+
+                        rowValues.add(dictRow.get(i).toString());
+                        hasOuput = true;
+                    }
+                    tableBuilder.add(rowValues);
+                }
+
+                if (hasOuput)
+                    tableBuilder.printTo(probe.output().out);
+            }
+            catch (Exception e)
+            {
+                probe.output().err.printf("Failed to list dictionaries: %s%n", 
e.getMessage());
+                System.exit(1);
+            }
+        }
+    }
 }
diff --git a/test/resources/nodetool/help/compressiondictionary 
b/test/resources/nodetool/help/compressiondictionary
index 92b9ecbe5d..52648bf1bc 100644
--- a/test/resources/nodetool/help/compressiondictionary
+++ b/test/resources/nodetool/help/compressiondictionary
@@ -34,6 +34,12 @@ SYNOPSIS
                 [(-u <username> | --username <username>)] 
compressiondictionary import
                 [--] <dictionaryPath>
 
+        nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
+                [(-pp | --print-port)] [(-pw <password> | --password 
<password>)]
+                [(-pwf <passwordFilePath> | --password-file 
<passwordFilePath>)]
+                [(-u <username> | --username <username>)] 
compressiondictionary cleanup
+                [(-d | --dry)]
+
 OPTIONS
         -h <host>, --host <host>
             Node hostname or ip address
@@ -82,3 +88,9 @@ COMMANDS
             dictionary is returned.
         import
             Import local dictionary to Cassandra.
+        cleanup
+            Clean up orphaned dictionaries by deleting them from 
system_distributed.
+            compression_dictionaries table, these are ones for which a table 
they were
+            trained for was dropped.
+
+            With --dry option, Only display orphaned dictionaries, do not 
remove them.
diff --git a/test/resources/nodetool/help/compressiondictionary$cleanup 
b/test/resources/nodetool/help/compressiondictionary$cleanup
new file mode 100644
index 0000000000..6fae13d218
--- /dev/null
+++ b/test/resources/nodetool/help/compressiondictionary$cleanup
@@ -0,0 +1,34 @@
+NAME
+        nodetool compressiondictionary cleanup - Clean up orphaned dictionaries
+        by deleting them from system_distributed.compression_dictionaries
+        table, these are ones for which a table they were trained for was
+        dropped.
+
+SYNOPSIS
+        nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
+                [(-pp | --print-port)] [(-pw <password> | --password 
<password>)]
+                [(-pwf <passwordFilePath> | --password-file 
<passwordFilePath>)]
+                [(-u <username> | --username <username>)] 
compressiondictionary cleanup
+                [(-d | --dry)]
+
+OPTIONS
+        -d, --dry
+            Only display orphaned dictionaries, do not remove them.
+
+        -h <host>, --host <host>
+            Node hostname or ip address
+
+        -p <port>, --port <port>
+            Remote jmx agent port number
+
+        -pp, --print-port
+            Operate in 4.0 mode with hosts disambiguated by port number
+
+        -pw <password>, --password <password>
+            Remote jmx agent password
+
+        -pwf <passwordFilePath>, --password-file <passwordFilePath>
+            Path to the JMX password file
+
+        -u <username>, --username <username>
+            Remote jmx agent username
diff --git 
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
index 47be2334fd..268c8222bd 100644
--- 
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db.compression;
 
+import java.util.UUID;
 import java.util.function.Consumer;
 
 import javax.management.openmbean.CompositeData;
@@ -38,6 +39,7 @@ public class CompressionDictionaryDataObjectTest
 {
     private static final String KEYSPACE = "ks";
     private static final String TABLE = "tb";
+    private static final String TABLE_ID = UUID.randomUUID().toString();
     private static final CompressionDictionary COMPRESSION_DICTIONARY = 
CompressionDictionaryHelper.INSTANCE.trainDictionary(KEYSPACE, TABLE);
     private static final CompressionDictionaryDataObject VALID_OBJECT = 
createValidObject();
 
@@ -59,7 +61,7 @@ public class CompressionDictionaryDataObjectTest
     @Test
     public void testConversionOfCompressionDictionaryToDataObject()
     {
-        CompositeData compositeData = 
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(KEYSPACE, 
TABLE, COMPRESSION_DICTIONARY);
+        CompositeData compositeData = 
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(KEYSPACE, 
TABLE, TABLE_ID, COMPRESSION_DICTIONARY);
         CompressionDictionaryDataObject dataObject = 
CompressionDictionaryDetailsTabularData.fromCompositeData(compositeData);
 
         assertEquals(KEYSPACE, dataObject.keyspace);
@@ -76,6 +78,7 @@ public class CompressionDictionaryDataObjectTest
     {
         LightweightCompressionDictionary lightweight = new 
LightweightCompressionDictionary(KEYSPACE,
                                                                                
             TABLE,
+                                                                               
             TABLE_ID,
                                                                                
             COMPRESSION_DICTIONARY.dictId(),
                                                                                
             COMPRESSION_DICTIONARY.checksum(),
                                                                                
             COMPRESSION_DICTIONARY.rawDictionary().length);
@@ -84,6 +87,7 @@ public class CompressionDictionaryDataObjectTest
 
         assertEquals(KEYSPACE, 
compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME));
         assertEquals(TABLE, 
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME));
+        assertEquals(TABLE_ID, 
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_ID_NAME));
         assertEquals(COMPRESSION_DICTIONARY.dictId().id, 
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME));
         
assertNull(compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME));
         assertEquals(COMPRESSION_DICTIONARY.dictId().kind.name(), 
compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME));
@@ -96,6 +100,7 @@ public class CompressionDictionaryDataObjectTest
     {
         assertInvalid(modifier -> modifier.withKeyspace(null), "Keyspace not 
specified.");
         assertInvalid(modifier -> modifier.withTable(null), "Table not 
specified.");
+        assertInvalid(modifier -> modifier.withTableId(null), "Table id not 
specified");
         assertInvalid(modifier -> modifier.withDictId(-1), "Provided 
dictionary id must be positive but it is '-1'.");
         assertInvalid(modifier -> modifier.withDict(null), "Provided 
dictionary byte array is null or empty.");
         assertInvalid(modifier -> modifier.withDict(new byte[0]), "Provided 
dictionary byte array is null or empty.");
@@ -127,6 +132,7 @@ public class CompressionDictionaryDataObjectTest
     {
         return new CompressionDictionaryDataObject("ks",
                                                    "tb",
+                                                   TABLE_ID,
                                                    123,
                                                    
COMPRESSION_DICTIONARY.rawDictionary(),
                                                    
CompressionDictionary.Kind.ZSTD.name(),
@@ -140,6 +146,7 @@ public class CompressionDictionaryDataObjectTest
     {
         private String keyspace;
         private String table;
+        private String tableId;
         private long dictId;
         private byte[] dict;
         private String kind;
@@ -150,6 +157,7 @@ public class CompressionDictionaryDataObjectTest
         {
             withKeyspace(from.keyspace);
             withTable(from.table);
+            withTableId(from.tableId);
             withDictId(from.dictId);
             withDict(from.dict);
             withKind(from.kind);
@@ -159,7 +167,7 @@ public class CompressionDictionaryDataObjectTest
 
         public CompressionDictionaryDataObject build()
         {
-            return new CompressionDictionaryDataObject(keyspace, table, 
dictId, dict, kind, dictChecksum, dictLength);
+            return new CompressionDictionaryDataObject(keyspace, table, 
tableId, dictId, dict, kind, dictChecksum, dictLength);
         }
 
         public DataObjectModifier withKeyspace(String keyspace)
@@ -174,6 +182,12 @@ public class CompressionDictionaryDataObjectTest
             return this;
         }
 
+        public DataObjectModifier withTableId(String tableId)
+        {
+            this.tableId = tableId;
+            return this;
+        }
+
         public DataObjectModifier withDictId(long dictId)
         {
             this.dictId = dictId;
diff --git 
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryOrphanedTest.java
 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryOrphanedTest.java
new file mode 100644
index 0000000000..b42da12554
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryOrphanedTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compression;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import 
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.apache.cassandra.tools.ToolRunner.ToolResult;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class CompressionDictionaryOrphanedTest extends CQLTester
+{
+    private static final String tableName = "mytable";
+
+    @BeforeClass
+    public static void setup() throws Throwable
+    {
+        requireNetwork();
+        startJMXServer();
+    }
+
+    @Test
+    public void testOrphanedCompressionDictionaries()
+    {
+        String firstTableId = createDictTable();
+        trainDictionary();
+        trainDictionary();
+
+        assertDicts(firstTableId);
+
+        // drop that table, so we will have two orphaned
+        dropDictTable();
+        // this will produce orphaned dictionaries - ones without existing 
table
+        assertOrphaned(firstTableId);
+
+        // create new table but with same name and train, we will have still 
two orphaned, from last table of same name
+        String secondTableId = createDictTable();
+        trainDictionary();
+        trainDictionary();
+
+        // still two from the first run
+        assertOrphaned(firstTableId);
+        // call nodetool, clear orphaned
+        cleanupOrphaned();
+        // verify that orphaned were cleared
+        assertNoOrphaned();
+
+        // now we have the second table only, the first one is dropped, and we 
have no orphaned dicts
+        // drop the second table of the same name, that will also produce two 
orphaned dics
+        assertDicts(secondTableId);
+
+        dropDictTable();
+        assertOrphaned(secondTableId);
+        cleanupOrphaned();
+        assertNoOrphaned();
+    }
+
+    private String getTableId()
+    {
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace(), 
tableName);
+        Assert.assertNotNull(cfs);
+        return cfs.metadata.id.toLongString();
+    }
+
+    private String createDictTable()
+    {
+        schemaChange(format("CREATE TABLE %s.%s (id int PRIMARY KEY, data 
text)" +
+                            " WITH compression = {'class': 
'ZstdDictionaryCompressor'}",
+                            keyspace(), tableName));
+
+        return getTableId();
+    }
+
+    private void dropDictTable()
+    {
+        schemaChange(format("DROP TABLE %s.%s", keyspace(), tableName));
+    }
+
+    private void assertDicts(String tableId)
+    {
+        List<LightweightCompressionDictionary> dicts = 
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries();
+        assertNotNull(dicts);
+        assertEquals(2, dicts.size());
+        assertEquals(tableId, dicts.get(0).tableId);
+        assertEquals(tableId, dicts.get(1).tableId);
+
+    }
+
+    private void assertOrphaned(String tableId)
+    {
+        List<LightweightCompressionDictionary> orphaned = 
SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries();
+        assertNotNull(orphaned);
+        assertEquals(2, orphaned.size());
+        assertEquals(tableId, orphaned.get(0).tableId);
+        assertEquals(tableId, orphaned.get(1).tableId);
+
+        ToolResult toolResult = invokeNodetool("compressiondictionary", 
"cleanup", "--dry");
+        toolResult.asserts().success();
+        String[] split = toolResult.getStdout().split(System.lineSeparator());
+        // split[0] is the header
+        assertEquals(3, split.length);
+        assertTrue(split[1].contains(tableId));
+        assertTrue(split[2].contains(tableId));
+    }
+
+    private void cleanupOrphaned()
+    {
+        invokeNodetool("compressiondictionary", "cleanup").asserts().success();
+    }
+
+    private void assertNoOrphaned()
+    {
+        ToolResult toolResult = invokeNodetool("compressiondictionary", 
"cleanup", "--dry");
+        toolResult.asserts().success();
+        assertTrue(toolResult.getStdout().isBlank());
+    }
+
+    private void trainDictionary()
+    {
+        createSSTables();
+
+        // Test training command with --force since we have limited test data
+        ToolResult result = invokeNodetool("compressiondictionary", "train", 
"--force", keyspace(), tableName);
+        result.assertOnCleanExit();
+
+        assertThat(result.getStdout())
+        .as("Should indicate training completed")
+        .contains("Training completed successfully")
+        .contains(keyspace())
+        .contains(tableName);
+    }
+
+    private void createSSTables()
+    {
+        for (int file = 0; file < 10; file++)
+        {
+            int batchSize = 1000;
+            for (int i = 0; i < batchSize; i++)
+            {
+                int index = i + file * batchSize;
+                executeFormattedQuery(format("INSERT INTO %s.%s (id, data) 
VALUES (?, ?)", keyspace(), tableName),
+                                      index, "test data " + index);
+            }
+
+            flush();
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
index e9c5b00b47..e99a318358 100644
--- 
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
@@ -61,9 +61,9 @@ public class CompressionDictionarySchedulerTest extends 
CQLTester
     {
         String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data 
text) " +
                                    "WITH compression = {'class': 
'ZstdDictionaryCompressor'}");
-        scheduler = new CompressionDictionaryScheduler(KEYSPACE, table, cache, 
true);
-
         ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(table);
+        scheduler = new CompressionDictionaryScheduler(KEYSPACE, table, 
cfs.metadata.id.toLongString(), cache, true);
+
         try (CompressionDictionaryManager manager = 
cfs.compressionDictionaryManager())
         {
             Set<SSTableReader> sstables = new HashSet<>();
@@ -81,9 +81,9 @@ public class CompressionDictionarySchedulerTest extends 
CQLTester
     {
         String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data 
text) " +
                                    "WITH compression = {'class': 
'ZstdDictionaryCompressor', 'chunk_length_in_kb': '4'}");
-        scheduler = new CompressionDictionaryScheduler(KEYSPACE, table, cache, 
true);
-
         ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(table);
+        scheduler = new CompressionDictionaryScheduler(KEYSPACE, table, 
cfs.metadata.id.toLongString(), cache, true);
+
         cfs.disableAutoCompaction();
         try (CompressionDictionaryManager manager = 
cfs.compressionDictionaryManager())
         {
diff --git 
a/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
 
b/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
index 11b0764a1a..2a3fd24862 100644
--- 
a/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
+++ 
b/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.schema;
 
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -79,12 +80,12 @@ public class 
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
     @Test
     public void testStoreCompressionDictionary() throws Exception
     {
+        String tableID = UUID.randomUUID().toString();
         // Store a dictionary
-        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, testDictionary1);
+        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID, testDictionary1);
 
         // Verify it was stored
-        CompressionDictionary retrieved = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(
-        TEST_KEYSPACE, TEST_TABLE);
+        CompressionDictionary retrieved = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID);
 
         assertThat(retrieved)
         .as("Retrieved dictionary should not be null")
@@ -106,15 +107,15 @@ public class 
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
     }
 
     @Test
-    public void testStoreMultipleDictionaries() throws Exception
+    public void testStoreMultipleDictionaries()
     {
+        String tableID = UUID.randomUUID().toString();
         // Store multiple dictionaries for the same table
-        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, testDictionary1);
-        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, testDictionary2);
+        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID, testDictionary1);
+        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID, testDictionary2);
 
         // Should retrieve the latest one (higher ID due to clustering order)
-        CompressionDictionary latest = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(
-        TEST_KEYSPACE, TEST_TABLE);
+        CompressionDictionary latest = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID);
 
         assertThat(latest)
         .as("Should retrieve the latest dictionary")
@@ -128,17 +129,18 @@ public class 
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
     }
 
     @Test
-    public void testRetrieveSpecificDictionary() throws Exception
+    public void testRetrieveSpecificDictionary()
     {
+        String tableID = UUID.randomUUID().toString();
         // Store both dictionaries
-        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, testDictionary1);
-        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, testDictionary2);
+        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID, testDictionary1);
+        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID, testDictionary2);
 
         // Retrieve specific dictionary by ID
         CompressionDictionary dict1 = 
SystemDistributedKeyspace.retrieveCompressionDictionary(
-        TEST_KEYSPACE, TEST_TABLE, 100L);
+        TEST_KEYSPACE, TEST_TABLE, tableID, 100L);
         CompressionDictionary dict2 = 
SystemDistributedKeyspace.retrieveCompressionDictionary(
-        TEST_KEYSPACE, TEST_TABLE, 200L);
+        TEST_KEYSPACE, TEST_TABLE, tableID, 200L);
 
         assertThat(dict1)
         .as("Should retrieve dictionary 1")
@@ -165,7 +167,7 @@ public class 
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
     {
         // Try to retrieve dictionary that doesn't exist
         CompressionDictionary nonExistent = 
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(
-        "nonexistent_keyspace", "nonexistent_table");
+        "nonexistent_keyspace", "nonexistent_table", "nonexistingid");
 
         assertThat(nonExistent)
         .as("Should return null for non-existent dictionary")
@@ -173,7 +175,7 @@ public class 
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
 
         // Try to retrieve specific dictionary that doesn't exist
         CompressionDictionary nonExistentById = 
SystemDistributedKeyspace.retrieveCompressionDictionary(
-        TEST_KEYSPACE, TEST_TABLE, 999L);
+        TEST_KEYSPACE, TEST_TABLE, "nonexistingid",999L);
 
         assertThat(nonExistentById)
         .as("Should return null for non-existent dictionary ID")
@@ -181,18 +183,20 @@ public class 
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
     }
 
     @Test
-    public void testStoredDictionaryIncludesLengthAndChecksum() throws 
Exception
+    public void testStoredDictionaryIncludesLengthAndChecksum()
     {
+        String tableID = UUID.randomUUID().toString();
         // Store a dictionary
-        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, testDictionary1);
+        SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE, 
TEST_TABLE, tableID, testDictionary1);
 
         // Query the table directly to verify dict_length and dict_checksum 
are stored
-        String query = String.format("SELECT dict_length, dict_checksum FROM 
%s.%s WHERE keyspace_name = '%s' AND table_name = '%s' AND dict_id = %d",
+        String query = String.format("SELECT dict_length, dict_checksum FROM 
%s.%s WHERE keyspace_name = '%s' AND table_name = '%s' AND dict_id = %d AND 
table_id = '%s'",
                                      SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
                                      
SystemDistributedKeyspace.COMPRESSION_DICTIONARIES,
                                      TEST_KEYSPACE,
                                      TEST_TABLE,
-                                     testDictionary1.dictId().id);
+                                     testDictionary1.dictId().id,
+                                     tableID);
 
         var resultSet = QueryProcessor.executeInternal(query);
 
diff --git 
a/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
 
b/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
index ea4e72801b..f831b34703 100644
--- 
a/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
+++ 
b/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
@@ -63,7 +63,7 @@ public class ExportImportListCompressionDictionaryTest 
extends CQLTester
         importDictionary(pair.right);
 
         list(pair.left, firstTable);
-        list(pair.left, secondTable);
+        list(export(secondTable, null).left, secondTable);
     }
 
     @Test
@@ -87,6 +87,7 @@ public class ExportImportListCompressionDictionaryTest 
extends CQLTester
         // test non-existing keyspace / table
         serializeToJsonFile(new CompressionDictionaryDataObject("abc",
                                                                 "def",
+                                                                "123",
                                                                 
pair.left.dictId,
                                                                 pair.left.dict,
                                                                 pair.left.kind,
@@ -177,6 +178,7 @@ public class ExportImportListCompressionDictionaryTest 
extends CQLTester
     {
         serializeToJsonFile(new 
CompressionDictionaryDataObject(pair.left.keyspace,
                                                                 table,
+                                                                
pair.left.tableId,
                                                                 
pair.left.dictId,
                                                                 pair.left.dict,
                                                                 pair.left.kind,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to