This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3fc830a19c Be able to detect and remove orphaned compression
dictionaries
3fc830a19c is described below
commit 3fc830a19c67b7869398e6929f7be4eb1bd03ae5
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Wed Feb 4 18:49:01 2026 +1100
Be able to detect and remove orphaned compression dictionaries
patch by Stefan Miklosovic; reviewed by Yifan Cai for CASSANDRA-21157
---
CHANGES.txt | 1 +
.../pages/managing/operating/compression.adoc | 2 +
.../db/compression/CompressionDictionary.java | 18 +++
.../CompressionDictionaryDetailsTabularData.java | 28 +++-
.../CompressionDictionaryEventHandler.java | 2 +-
.../compression/CompressionDictionaryManager.java | 18 ++-
.../CompressionDictionaryScheduler.java | 5 +-
.../schema/SystemDistributedKeyspace.java | 116 +++++++++++---
.../apache/cassandra/service/StorageService.java | 24 +++
.../cassandra/service/StorageServiceMBean.java | 4 +
src/java/org/apache/cassandra/tools/NodeProbe.java | 14 ++
.../CompressionDictionaryCommandGroup.java | 130 +++++++++++----
test/resources/nodetool/help/compressiondictionary | 12 ++
.../nodetool/help/compressiondictionary$cleanup | 34 ++++
.../CompressionDictionaryDataObjectTest.java | 18 ++-
.../CompressionDictionaryOrphanedTest.java | 177 +++++++++++++++++++++
.../CompressionDictionarySchedulerTest.java | 8 +-
...stributedKeyspaceCompressionDictionaryTest.java | 42 ++---
.../ExportImportListCompressionDictionaryTest.java | 4 +-
19 files changed, 569 insertions(+), 88 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a56f0b8df9..f94cd94b4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Be able to detect and remove orphaned compression dictionaries
(CASSANDRA-21157)
* Fix BigTableVerifier to only read a data file during extended verification
(CASSANDRA-21150)
* Reduce memory allocation during transformation of BatchStatement to
Mutation (CASSANDRA-21141)
* Direct I/O support for compaction reads (CASSANDRA-19987)
diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc
b/doc/modules/cassandra/pages/managing/operating/compression.adoc
index a5a06c4e00..97de5c12c0 100644
--- a/doc/modules/cassandra/pages/managing/operating/compression.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc
@@ -403,6 +403,8 @@ There are these four commands for now related to
compression dictionaries:
* export - a user can export a compression dictionary of a keyspace and a
table, either the last one or
by a specific id, to a file.
* import - a user can import a compression dictionary, exported by above
command, from a file to a cluster.
+* cleanup - removes orphaned dictionaries. These are the ones for which a
table they were trained for does not exist anymore (it was dropped).
+A user can inspect which dictionaries are orphaned by `-d / --dry` flag
without deleting them.
For `train` subcommand, it is possible to specify:
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
index bbacacd85c..30f37a0b55 100644
--- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
+++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
@@ -261,11 +261,13 @@ public interface CompressionDictionary
int size = row.getInt("dict_length");
String keyspaceName = row.getString("keyspace_name");
String tableName = row.getString("table_name");
+ String tableId = row.getString("table_id");
try
{
return new LightweightCompressionDictionary(keyspaceName,
tableName,
+ tableId,
new
DictId(CompressionDictionary.Kind.valueOf(kindStr), dictId),
checksum,
size);
@@ -430,21 +432,37 @@ public interface CompressionDictionary
{
public final String keyspaceName;
public final String tableName;
+ public final String tableId;
public final DictId dictId;
public final int checksum;
public final int size;
public LightweightCompressionDictionary(String keyspaceName,
String tableName,
+ String tableId,
DictId dictId,
int checksum,
int size)
{
this.keyspaceName = keyspaceName;
this.tableName = tableName;
+ this.tableId = tableId;
this.dictId = dictId;
this.checksum = checksum;
this.size = size;
}
+
+ @Override
+ public String toString()
+ {
+ return "LightweightCompressionDictionary{" +
+ "keyspaceName='" + keyspaceName + '\'' +
+ ", tableName='" + tableName + '\'' +
+ ", tableId='" + tableId + '\'' +
+ ", dictId=" + dictId +
+ ", checksum=" + checksum +
+ ", size=" + size +
+ '}';
+ }
}
}
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
index 3802c10aba..588bb418d3 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java
@@ -39,16 +39,25 @@ import static java.lang.String.format;
public class CompressionDictionaryDetailsTabularData
{
+ /**
+ * Position inside index names of tabular type of tabular data returned
upon
+ * listing dictionaries where table id is expected to be located.
+ * We do not need to process this entry at all time, e.g. when not listing
+ * orphaned compression dictionaries.
+ */
+ public static final int TABULAR_DATA_TYPE_TABLE_ID_INDEX = 2;
+
/**
* Position inside index names of tabular type of tabular data returned
upon
* listing dictionaries where raw dictionary is expected to be located.
* We do not need to process this entry as listing does not contain any
raw dictionary,
* only exporting does.
*/
- public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3;
+ public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 4;
public static final String KEYSPACE_NAME = "Keyspace";
public static final String TABLE_NAME = "Table";
+ public static final String TABLE_ID_NAME = "TableId";
public static final String DICT_ID_NAME = "DictId";
public static final String DICT_NAME = "Dict";
public static final String KIND_NAME = "Kind";
@@ -58,6 +67,7 @@ public class CompressionDictionaryDetailsTabularData
private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME,
TABLE_NAME,
+ TABLE_ID_NAME,
DICT_ID_NAME,
DICT_NAME,
KIND_NAME,
@@ -66,6 +76,7 @@ public class CompressionDictionaryDetailsTabularData
private static final String[] ITEM_DESCS = new String[]{ "keyspace",
"table",
+ "table_id",
"dictionary_id",
"dictionary_bytes",
"kind",
@@ -84,6 +95,7 @@ public class CompressionDictionaryDetailsTabularData
{
ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace
SimpleType.STRING, // table
+ SimpleType.STRING, // tableId
SimpleType.LONG, // dict id
new
ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes
SimpleType.STRING, // kind
@@ -115,6 +127,7 @@ public class CompressionDictionaryDetailsTabularData
{
dictionary.keyspaceName,
dictionary.tableName,
+ dictionary.tableId,
dictionary.dictId.id,
null, // on purpose not returning
actual dictionary
dictionary.dictId.kind.name(),
@@ -133,10 +146,11 @@ public class CompressionDictionaryDetailsTabularData
*
* @param keyspace keyspace of a dictionary
* @param table table of a dictionary
+ * @param tableId id of a table dictionary is for
* @param dictionary dictionary itself
* @return composite data representing dictionary
*/
- public static CompositeData fromCompressionDictionary(String keyspace,
String table, CompressionDictionary dictionary)
+ public static CompositeData fromCompressionDictionary(String keyspace,
String table, String tableId, CompressionDictionary dictionary)
{
try
{
@@ -146,6 +160,7 @@ public class CompressionDictionaryDetailsTabularData
{
keyspace,
table,
+ tableId,
dictionary.dictId().id,
dictionary.rawDictionary(),
dictionary.kind().name(),
@@ -176,6 +191,7 @@ public class CompressionDictionaryDetailsTabularData
{
dataObject.keyspace,
dataObject.table,
+ dataObject.tableId,
dataObject.dictId,
dataObject.dict,
dataObject.kind,
@@ -200,6 +216,7 @@ public class CompressionDictionaryDetailsTabularData
{
return new CompressionDictionaryDataObject((String)
compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME),
(String)
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME),
+ (String)
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_ID_NAME),
(Long)
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME),
(byte[])
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME),
(String)
compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME),
@@ -211,6 +228,7 @@ public class CompressionDictionaryDetailsTabularData
{
public final String keyspace;
public final String table;
+ public final String tableId;
public final long dictId;
public final byte[] dict;
public final String kind;
@@ -220,6 +238,7 @@ public class CompressionDictionaryDetailsTabularData
@JsonCreator
public CompressionDictionaryDataObject(@JsonProperty("keyspace")
String keyspace,
@JsonProperty("table") String
table,
+ @JsonProperty("tableId") String
tableId,
@JsonProperty("dictId") long
dictId,
@JsonProperty("dict") byte[]
dict,
@JsonProperty("kind") String
kind,
@@ -228,6 +247,7 @@ public class CompressionDictionaryDetailsTabularData
{
this.keyspace = keyspace;
this.table = table;
+ this.tableId = tableId;
this.dictId = dictId;
this.dict = dict;
this.kind = kind;
@@ -241,7 +261,7 @@ public class CompressionDictionaryDetailsTabularData
* An object of this class is considered to be valid if:
*
* <ul>
- * <li>keyspace and table are not null</li>
+ * <li>keyspace, table and table id are not null</li>
* <li>dict id is lower than 0</li>
* <li>dict is not null nor empty</li>
* <li>dict length is less than or equal to 1MiB</li>
@@ -257,6 +277,8 @@ public class CompressionDictionaryDetailsTabularData
throw new IllegalArgumentException("Keyspace not specified.");
if (table == null)
throw new IllegalArgumentException("Table not specified.");
+ if (tableId == null)
+ throw new IllegalArgumentException("Table id not specified");
if (dictId <= 0)
throw new IllegalArgumentException("Provided dictionary id
must be positive but it is '" + dictId + "'.");
if (dict == null || dict.length == 0)
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
index c2b0a32a76..ed4d270d0c 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java
@@ -87,7 +87,7 @@ public class CompressionDictionaryEventHandler implements
ICompressionDictionary
return;
}
- CompressionDictionary dictionary =
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName,
tableName, dictionaryId.id);
+ CompressionDictionary dictionary =
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName,
tableName, cfs.metadata().id.toLongString(), dictionaryId.id);
cache.add(dictionary);
}
catch (Exception e)
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
index 2e05fb9681..8adefad01d 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
@@ -57,6 +57,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
private final String keyspaceName;
private final String tableName;
+ private final String tableId;
private final ColumnFamilyStore columnFamilyStore;
private volatile boolean mbeanRegistered;
private volatile boolean isEnabled;
@@ -71,12 +72,13 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
{
this.keyspaceName = columnFamilyStore.keyspace.getName();
this.tableName = columnFamilyStore.getTableName();
+ this.tableId = columnFamilyStore.metadata().id.toLongString();
this.columnFamilyStore = columnFamilyStore;
this.isEnabled =
columnFamilyStore.metadata().params.compression.isDictionaryCompressionEnabled();
this.cache = new CompressionDictionaryCache();
this.eventHandler = new
CompressionDictionaryEventHandler(columnFamilyStore, cache);
- this.scheduler = new CompressionDictionaryScheduler(keyspaceName,
tableName, cache, isEnabled);
+ this.scheduler = new CompressionDictionaryScheduler(keyspaceName,
tableName, tableId, cache, isEnabled);
if (isEnabled)
{
// Initialize components
@@ -253,7 +255,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
@Override
public TabularData listCompressionDictionaries()
{
- List<LightweightCompressionDictionary> dictionaries =
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries(keyspaceName,
tableName);
+ List<LightweightCompressionDictionary> dictionaries =
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries(keyspaceName,
tableName, tableId);
TabularDataSupport tableData = new
TabularDataSupport(CompressionDictionaryDetailsTabularData.TABULAR_TYPE);
if (dictionaries == null)
@@ -272,21 +274,21 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
@Override
public CompositeData getCompressionDictionary()
{
- CompressionDictionary compressionDictionary =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName,
tableName);
+ CompressionDictionary compressionDictionary =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName,
tableName, tableId);
if (compressionDictionary == null)
return null;
- return
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName,
tableName, compressionDictionary);
+ return
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName,
tableName, tableId, compressionDictionary);
}
@Override
public CompositeData getCompressionDictionary(long dictId)
{
- CompressionDictionary compressionDictionary =
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName,
tableName, dictId);
+ CompressionDictionary compressionDictionary =
SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName,
tableName, tableId, dictId);
if (compressionDictionary == null)
return null;
- return
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName,
tableName, compressionDictionary);
+ return
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName,
tableName, tableId, compressionDictionary);
}
@Override
@@ -316,7 +318,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
CompressionDictionary.DictId dictId = new
CompressionDictionary.DictId(kind, dataObject.dictId);
- LightweightCompressionDictionary latestCompressionDictionary =
SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName,
tableName);
+ LightweightCompressionDictionary latestCompressionDictionary =
SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName,
tableName, tableId);
if (latestCompressionDictionary != null &&
latestCompressionDictionary.dictId.id > dictId.id)
{
throw new IllegalArgumentException(format("Dictionary to import
has older dictionary id (%s) than the latest compression dictionary (%s) for
table %s.%s",
@@ -420,7 +422,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
return;
}
- SystemDistributedKeyspace.storeCompressionDictionary(keyspaceName,
tableName, dictionary);
+ SystemDistributedKeyspace.storeCompressionDictionary(keyspaceName,
tableName, tableId, dictionary);
cache.add(dictionary);
}
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
index e757364711..c6f5b6a922 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java
@@ -51,6 +51,7 @@ public class CompressionDictionaryScheduler implements
ICompressionDictionarySch
private final String keyspaceName;
private final String tableName;
+ private final String tableId;
private final ICompressionDictionaryCache cache;
private final AtomicBoolean manualTrainingInProgress = new
AtomicBoolean(false);
@@ -59,11 +60,13 @@ public class CompressionDictionaryScheduler implements
ICompressionDictionarySch
public CompressionDictionaryScheduler(String keyspaceName,
String tableName,
+ String tableId,
ICompressionDictionaryCache cache,
boolean isEnabled)
{
this.keyspaceName = keyspaceName;
this.tableName = tableName;
+ this.tableId = tableId;
this.cache = cache;
this.isEnabled = isEnabled;
}
@@ -135,7 +138,7 @@ public class CompressionDictionaryScheduler implements
ICompressionDictionarySch
return;
}
- CompressionDictionary dictionary =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName,
tableName);
+ CompressionDictionary dictionary =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName,
tableName, tableId);
cache.add(dictionary);
}
catch (Exception e)
diff --git
a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index 04ccb54345..31141b6852 100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.CommonRange;
import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
@@ -199,13 +200,14 @@ public final class SystemDistributedKeyspace
public static final String COMPRESSION_DICTIONARIES_CQL = "CREATE TABLE IF
NOT EXISTS %s (" +
"keyspace_name
text," +
"table_name
text," +
+ "table_id text,"
+
"kind text," +
"dict_id
bigint," +
"dict blob," +
"dict_length
int," +
"dict_checksum
int," +
- "PRIMARY KEY
((keyspace_name, table_name), dict_id)) " +
- "WITH CLUSTERING
ORDER BY (dict_id DESC)"; // in order to retrieve the latest dictionary; the
contract is the newer the dictionary the larger the dict_id
+ "PRIMARY KEY
((keyspace_name, table_name), table_id, dict_id)) " +
+ "WITH CLUSTERING
ORDER BY (table_id DESC, dict_id DESC)"; // in order to retrieve the latest
dictionary; the contract is the newer the dictionary the larger the dict_id
private static final TableMetadata CompressionDictionariesTable =
parse(COMPRESSION_DICTIONARIES, "Compression dictionaries for
applicable tables", COMPRESSION_DICTIONARIES_CQL).build();
@@ -415,17 +417,22 @@ public final class SystemDistributedKeyspace
*
* @param keyspaceName the keyspace name to associate with the dictionary
* @param tableName the table name to associate with the dictionary
+ * @param tableId the unique id of a table to associate with the dictionary
* @param dictionary the compression dictionary to store
*/
- public static void storeCompressionDictionary(String keyspaceName, String
tableName, CompressionDictionary dictionary)
+ public static void storeCompressionDictionary(String keyspaceName,
+ String tableName,
+ String tableId,
+ CompressionDictionary
dictionary)
{
byte[] dict = dictionary.rawDictionary();
- String query = "INSERT INTO %s.%s (keyspace_name, table_name, kind,
dict_id, dict, dict_length, dict_checksum) VALUES ('%s', '%s', '%s', %s, ?, %s,
%s)";
+ String query = "INSERT INTO %s.%s (keyspace_name, table_name,
table_id, kind, dict_id, dict, dict_length, dict_checksum) VALUES ('%s', '%s',
'%s', '%s', %s, ?, %s, %s)";
String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
COMPRESSION_DICTIONARIES,
keyspaceName,
tableName,
+ tableId,
dictionary.kind(),
dictionary.dictId().id,
dict.length,
@@ -440,14 +447,15 @@ public final class SystemDistributedKeyspace
*
* @param keyspaceName the keyspace name to retrieve the dictionary for
* @param tableName the table name to retrieve the dictionary for
+ * @param tableId the id of the table to retrieve the dictionary for
* @return the latest compression dictionary for the specified keyspace
and table,
* or null if no dictionary exists or if an error occurs during
retrieval
*/
@Nullable
- public static CompressionDictionary
retrieveLatestCompressionDictionary(String keyspaceName, String tableName)
+ public static CompressionDictionary
retrieveLatestCompressionDictionary(String keyspaceName, String tableName,
String tableId)
{
- String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' LIMIT 1";
- String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName);
+ String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND table_id='%s' LIMIT
1";
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName, tableId);
try
{
return
CompressionDictionary.createFromRow(QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one());
@@ -464,14 +472,15 @@ public final class SystemDistributedKeyspace
*
* @param keyspaceName the keyspace name to retrieve the dictionary for
* @param tableName the table name to retrieve the dictionary for
+ * @param tableId the table id to retrieve the dictionary for
* @return the latest compression dictionary for the specified keyspace
and table,
* or null if no dictionary exists or if an error occurs during
retrieval
*/
@Nullable
- public static LightweightCompressionDictionary
retrieveLightweightLatestCompressionDictionary(String keyspaceName, String
tableName)
+ public static LightweightCompressionDictionary
retrieveLightweightLatestCompressionDictionary(String keyspaceName, String
tableName, String tableId)
{
- String query = "SELECT keyspace_name, table_name, kind, dict_id,
dict_checksum, dict_length FROM %s.%s WHERE keyspace_name='%s' AND
table_name='%s' LIMIT 1";
- String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName);
+ String query = "SELECT keyspace_name, table_name, table_id, kind,
dict_id, dict_checksum, dict_length FROM %s.%s WHERE keyspace_name='%s' AND
table_name='%s' AND table_id='%s' LIMIT 1";
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName, tableId);
try
{
return
CompressionDictionary.createFromRowLightweight(QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one());
@@ -488,15 +497,16 @@ public final class SystemDistributedKeyspace
*
* @param keyspaceName the keyspace name to retrieve the dictionary for
* @param tableName the table name to retrieve the dictionary for
+ * @param tableId the table id to retrieve the dictionary for
* @param dictionaryId the dictionary id to retrieve the dictionary for
* @return the compression dictionary identified by the specified
keyspace, table and dictionaryId,
* or null if no dictionary exists or if an error occurs during
retrieval
*/
@Nullable
- public static CompressionDictionary retrieveCompressionDictionary(String
keyspaceName, String tableName, long dictionaryId)
+ public static CompressionDictionary retrieveCompressionDictionary(String
keyspaceName, String tableName, String tableId, long dictionaryId)
{
- String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND dict_id=%s";
- String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName, dictionaryId);
+ String query = "SELECT kind, dict_id, dict, dict_length, dict_checksum
FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND table_id='%s' AND
dict_id=%s";
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName, tableId, dictionaryId);
try
{
return
CompressionDictionary.createFromRow(QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE).one());
@@ -512,17 +522,87 @@ public final class SystemDistributedKeyspace
*
* @param keyspaceName the keyspace name to retrieve the dictionary for
* @param tableName the table name to retrieve the dictionary for
+ * @param tableId the table id to retrieve the dictionary for
* @return the compression dictionaries identified by the specified
keyspace and table,
- * or null if no dictionary exists or if an error occurs during
retrieval
+ * empty list if no dictionary exists or null if an error occurs
during retrieval
+ */
+ @Nullable
+ public static List<LightweightCompressionDictionary>
retrieveLightweightCompressionDictionaries(String keyspaceName, String
tableName, String tableId)
+ {
+ String query = "SELECT keyspace_name, table_name, table_id, kind,
dict_id, dict_length, dict_checksum FROM %s.%s WHERE keyspace_name='%s' AND
table_name='%s' AND table_id='%s'";
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName, tableId);
+ return retrieveLightweightCompressionDictionariesInternal(fmtQuery);
+ }
+
+ /**
+ * Retrieves all compression dictionaries in a lightweight form.
+ *
+ * @return all compression dictionaries, lightweight form, empty list if
no dictionary exists
+ * or null if an error occurs during retrieval
*/
@Nullable
- public static List<LightweightCompressionDictionary>
retrieveLightweightCompressionDictionaries(String keyspaceName, String
tableName)
+ public static List<LightweightCompressionDictionary>
retrieveLightweightCompressionDictionaries()
+ {
+ String query = "SELECT keyspace_name, table_name, table_id, kind,
dict_id, dict_length, dict_checksum FROM %s.%s";
+ String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES);
+ return retrieveLightweightCompressionDictionariesInternal(fmtQuery);
+ }
+
+ /**
+ * Retrieves all orphaned compression dictionaries in a lightweight form.
+ *
+ * @return all orphaned compression dictionaries, lightweight form, empty
list if no dictionary exists
+ * or null if an error occurs during retrieval
+ */
+ public static List<LightweightCompressionDictionary>
retrieveOrphanedLightweightCompressionDictionaries()
+ {
+ List<LightweightCompressionDictionary> dicts =
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries();
+ if (dicts == null || dicts.isEmpty())
+ return List.of();
+
+ List<LightweightCompressionDictionary> orphaned = new ArrayList<>();
+ for (LightweightCompressionDictionary dict : dicts)
+ {
+ TableMetadata tableMetadata =
ClusterMetadata.current().schema.getTableMetadata(dict.keyspaceName,
dict.tableName);
+ if (tableMetadata == null ||
!tableMetadata.id.toLongString().equals(dict.tableId))
+ orphaned.add(dict);
+ }
+
+ return orphaned;
+ }
+
+ /**
+ * Removes all orphaned compression dictionaries.
+ */
+ public static void clearOrphanedCompressionDictionaries()
+ {
+ for (LightweightCompressionDictionary orphanedDict :
SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries())
+ {
+ try
+ {
+ QueryProcessor.execute(String.format("DELETE FROM %s.%s WHERE
keyspace_name='%s' AND table_name='%s' AND table_id='%s' AND dict_id=%s",
+
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
+
SystemDistributedKeyspace.COMPRESSION_DICTIONARIES,
+ orphanedDict.keyspaceName,
+ orphanedDict.tableName,
+ orphanedDict.tableId,
+ orphanedDict.dictId.id),
+ ConsistencyLevel.ONE);
+ }
+ catch (Exception e)
+ {
+ logger.error("Unable to delete orphaned compression
dictionary: {}, Reason: {}",
+ orphanedDict.toString(),
+ e.getMessage());
+ }
+ }
+ }
+
+ private static List<LightweightCompressionDictionary>
retrieveLightweightCompressionDictionariesInternal(String query)
{
- String query = "SELECT keyspace_name, table_name, kind, dict_id,
dict_length, dict_checksum FROM %s.%s WHERE keyspace_name='%s' AND
table_name='%s'";
- String fmtQuery = format(query,
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES,
keyspaceName, tableName);
try
{
- UntypedResultSet result = QueryProcessor.execute(fmtQuery,
ConsistencyLevel.ONE);
+ UntypedResultSet result = QueryProcessor.execute(query,
ConsistencyLevel.ONE);
if (result.isEmpty())
return Collections.emptyList();
List<LightweightCompressionDictionary> dictionaries = new
ArrayList<>();
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index dc9d9e3f68..c9c8f4ed10 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -60,6 +60,7 @@ import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
@@ -105,6 +106,8 @@ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
+import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.dht.BootStrapper;
@@ -5782,4 +5785,25 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
}
return sstablesTouched;
}
+
+ @Override
+ public TabularData getOrphanedCompressionDictionaries()
+ {
+ List<LightweightCompressionDictionary> dicts =
SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries();
+ TabularDataSupport tabularData = new
TabularDataSupport(CompressionDictionaryDetailsTabularData.TABULAR_TYPE);
+
+ if (dicts.isEmpty())
+ return tabularData;
+
+ for (LightweightCompressionDictionary dict : dicts)
+
tabularData.put(CompressionDictionaryDetailsTabularData.fromLightweightCompressionDictionary(dict));
+
+ return tabularData;
+ }
+
+ @Override
+ public void clearOrphanedCompressionDictionaries()
+ {
+ SystemDistributedKeyspace.clearOrphanedCompressionDictionaries();
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index be54b4fc76..da03f51a47 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1400,4 +1400,8 @@ public interface StorageServiceMBean extends
NotificationEmitter
/** Mutates the repaired state of all SSTables for the given SSTables */
public List<String> mutateSSTableRepairedState(boolean repaired, boolean
preview, String keyspace, List<String> tables);
+
+ TabularData getOrphanedCompressionDictionaries();
+
+ void clearOrphanedCompressionDictionaries();
}
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 52ed11509b..567df09283 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -2819,6 +2819,20 @@ public class NodeProbe implements AutoCloseable
return TrainingState.fromCompositeData(compositeData);
}
+ /**
+ * Gets all compression dictionaries which are orphaned, that is a table
they were trained for was dropped.
+ * @return tabular data of orphaned dictionaries
+ */
+ public TabularData getOrphanedCompressionDictionaries()
+ {
+ return ssProxy.getOrphanedCompressionDictionaries();
+ }
+
+ public void clearOrphanedCompressionDictionaries()
+ {
+ ssProxy.clearOrphanedCompressionDictionaries();
+ }
+
private CompressionDictionaryManagerMBean getDictionaryManagerProxy(String
keyspace, String table) throws IOException
{
// Construct table-specific MBean name
diff --git
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
index 757260f66e..22a3affcbf 100644
---
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
+++
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@@ -37,6 +38,7 @@ import
org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.Trainin
import org.apache.cassandra.db.compression.TrainingState;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
import org.apache.cassandra.utils.Clock;
@@ -61,7 +63,8 @@ import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MA
subcommands = {
CompressionDictionaryCommandGroup.TrainDictionary.class,
CompressionDictionaryCommandGroup.ListDictionaries.class,
CompressionDictionaryCommandGroup.ExportDictionary.class,
-
CompressionDictionaryCommandGroup.ImportDictionary.class })
+
CompressionDictionaryCommandGroup.ImportDictionary.class,
+
CompressionDictionaryCommandGroup.CleanupDictionaries.class})
public class CompressionDictionaryCommandGroup
{
@Command(name = "train",
@@ -216,40 +219,43 @@ public class CompressionDictionaryCommandGroup
@Override
protected void execute(NodeProbe probe)
{
- try
- {
- TableBuilder tableBuilder = new TableBuilder();
- TabularData tabularData =
probe.listCompressionDictionaries(keyspace, table);
- List<String> indexNames =
tabularData.getTabularType().getIndexNames();
-
- List<String> columns = new ArrayList<>(indexNames);
- // ignore raw dict
-
columns.remove(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX);
- tableBuilder.add(columns);
-
- for (Object eachDict : tabularData.keySet())
- {
- final List<?> dictRow = (List<?>) eachDict;
-
- List<String> rowValues = new ArrayList<>();
-
- for (int i = 0; i < dictRow.size(); i++)
- {
- // ignore raw dict
- if (i ==
CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX)
- continue;
+ ListingHelper.list(probe,
+ () ->
+ {
+ try
+ {
+ return
probe.listCompressionDictionaries(keyspace, table);
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ },
+
List.of(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_TABLE_ID_INDEX,
+
CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX));
+ }
+ }
- rowValues.add(dictRow.get(i).toString());
- }
- tableBuilder.add(rowValues);
- }
+ @Command(name = "cleanup", description = "Clean up orphaned dictionaries
by deleting them from " + SystemDistributedKeyspace.NAME
+ + '.' +
SystemDistributedKeyspace.COMPRESSION_DICTIONARIES +
+ " table, these are ones for which
a table they were trained for was dropped.")
+ public static class CleanupDictionaries extends AbstractCommand
+ {
+ @Option(names = {"-d", "--dry"}, description = "Only display orphaned
dictionaries, do not remove them.")
+ private boolean dry;
- tableBuilder.printTo(probe.output().out);
+ @Override
+ protected void execute(NodeProbe probe)
+ {
+ if (dry)
+ {
+ ListingHelper.list(probe,
+ probe::getOrphanedCompressionDictionaries,
+
List.of(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX));
}
- catch (Exception e)
+ else
{
- probe.output().err.printf("Failed to list dictionaries: %s%n",
e.getMessage());
- System.exit(1);
+ probe.clearOrphanedCompressionDictionaries();
}
}
}
@@ -368,4 +374,66 @@ public class CompressionDictionaryCommandGroup
}
}
}
+
+ private static class ListingHelper
+ {
+ /**
+ * Lists dictionaries, the concrete mean of querying them is delegated
to a specific subcommand.
+ *
+ * @param probe probe to query Cassandra with
+ * @param tabularDataSupplier supplier of tabular data containing
dictionaries to display
+ * @param removedColumnsIndices supplier of an array with indexes of
columns in tabular data to be ignored
+ * when displaying them
+ */
+ public static void list(NodeProbe probe,
+ Supplier<TabularData> tabularDataSupplier,
+ List<Integer> removedColumnsIndices)
+ {
+ try
+ {
+ TableBuilder tableBuilder = new TableBuilder();
+ TabularData tabularData = tabularDataSupplier.get();
+ List<String> indexNames =
tabularData.getTabularType().getIndexNames();
+
+ // ignore columns not meant to be displayed to a user
+ List<String> columns = new ArrayList<>();
+ for (int i = 0; i < indexNames.size(); i++)
+ {
+ if (!removedColumnsIndices.contains(i))
+ {
+ columns.add(indexNames.get(i));
+ }
+ }
+
+ tableBuilder.add(columns);
+
+ boolean hasOuput = false;
+ for (Object eachDict : tabularData.keySet())
+ {
+ final List<?> dictRow = (List<?>) eachDict;
+
+ List<String> rowValues = new ArrayList<>();
+
+ for (int i = 0; i < dictRow.size(); i++)
+ {
+ // ignore columns not meant to be displayed to a user
+ if (removedColumnsIndices.contains(i))
+ continue;
+
+ rowValues.add(dictRow.get(i).toString());
+ hasOuput = true;
+ }
+ tableBuilder.add(rowValues);
+ }
+
+ if (hasOuput)
+ tableBuilder.printTo(probe.output().out);
+ }
+ catch (Exception e)
+ {
+ probe.output().err.printf("Failed to list dictionaries: %s%n",
e.getMessage());
+ System.exit(1);
+ }
+ }
+ }
}
diff --git a/test/resources/nodetool/help/compressiondictionary
b/test/resources/nodetool/help/compressiondictionary
index 92b9ecbe5d..52648bf1bc 100644
--- a/test/resources/nodetool/help/compressiondictionary
+++ b/test/resources/nodetool/help/compressiondictionary
@@ -34,6 +34,12 @@ SYNOPSIS
[(-u <username> | --username <username>)]
compressiondictionary import
[--] <dictionaryPath>
+ nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
+ [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
+ [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
+ [(-u <username> | --username <username>)]
compressiondictionary cleanup
+ [(-d | --dry)]
+
OPTIONS
-h <host>, --host <host>
Node hostname or ip address
@@ -82,3 +88,9 @@ COMMANDS
dictionary is returned.
import
Import local dictionary to Cassandra.
+ cleanup
+ Clean up orphaned dictionaries by deleting them from
system_distributed.
+ compression_dictionaries table, these are ones for which a table
they were
+ trained for was dropped.
+
+ With --dry option, Only display orphaned dictionaries, do not
remove them.
diff --git a/test/resources/nodetool/help/compressiondictionary$cleanup
b/test/resources/nodetool/help/compressiondictionary$cleanup
new file mode 100644
index 0000000000..6fae13d218
--- /dev/null
+++ b/test/resources/nodetool/help/compressiondictionary$cleanup
@@ -0,0 +1,34 @@
+NAME
+ nodetool compressiondictionary cleanup - Clean up orphaned dictionaries
+ by deleting them from system_distributed.compression_dictionaries
+ table, these are ones for which a table they were trained for was
+ dropped.
+
+SYNOPSIS
+ nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
+ [(-pp | --print-port)] [(-pw <password> | --password
<password>)]
+ [(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
+ [(-u <username> | --username <username>)]
compressiondictionary cleanup
+ [(-d | --dry)]
+
+OPTIONS
+ -d, --dry
+ Only display orphaned dictionaries, do not remove them.
+
+ -h <host>, --host <host>
+ Node hostname or ip address
+
+ -p <port>, --port <port>
+ Remote jmx agent port number
+
+ -pp, --print-port
+ Operate in 4.0 mode with hosts disambiguated by port number
+
+ -pw <password>, --password <password>
+ Remote jmx agent password
+
+ -pwf <passwordFilePath>, --password-file <passwordFilePath>
+ Path to the JMX password file
+
+ -u <username>, --username <username>
+ Remote jmx agent username
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
index 47be2334fd..268c8222bd 100644
---
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryDataObjectTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.compression;
+import java.util.UUID;
import java.util.function.Consumer;
import javax.management.openmbean.CompositeData;
@@ -38,6 +39,7 @@ public class CompressionDictionaryDataObjectTest
{
private static final String KEYSPACE = "ks";
private static final String TABLE = "tb";
+ private static final String TABLE_ID = UUID.randomUUID().toString();
private static final CompressionDictionary COMPRESSION_DICTIONARY =
CompressionDictionaryHelper.INSTANCE.trainDictionary(KEYSPACE, TABLE);
private static final CompressionDictionaryDataObject VALID_OBJECT =
createValidObject();
@@ -59,7 +61,7 @@ public class CompressionDictionaryDataObjectTest
@Test
public void testConversionOfCompressionDictionaryToDataObject()
{
- CompositeData compositeData =
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(KEYSPACE,
TABLE, COMPRESSION_DICTIONARY);
+ CompositeData compositeData =
CompressionDictionaryDetailsTabularData.fromCompressionDictionary(KEYSPACE,
TABLE, TABLE_ID, COMPRESSION_DICTIONARY);
CompressionDictionaryDataObject dataObject =
CompressionDictionaryDetailsTabularData.fromCompositeData(compositeData);
assertEquals(KEYSPACE, dataObject.keyspace);
@@ -76,6 +78,7 @@ public class CompressionDictionaryDataObjectTest
{
LightweightCompressionDictionary lightweight = new
LightweightCompressionDictionary(KEYSPACE,
TABLE,
+
TABLE_ID,
COMPRESSION_DICTIONARY.dictId(),
COMPRESSION_DICTIONARY.checksum(),
COMPRESSION_DICTIONARY.rawDictionary().length);
@@ -84,6 +87,7 @@ public class CompressionDictionaryDataObjectTest
assertEquals(KEYSPACE,
compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME));
assertEquals(TABLE,
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME));
+ assertEquals(TABLE_ID,
compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_ID_NAME));
assertEquals(COMPRESSION_DICTIONARY.dictId().id,
compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME));
assertNull(compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME));
assertEquals(COMPRESSION_DICTIONARY.dictId().kind.name(),
compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME));
@@ -96,6 +100,7 @@ public class CompressionDictionaryDataObjectTest
{
assertInvalid(modifier -> modifier.withKeyspace(null), "Keyspace not
specified.");
assertInvalid(modifier -> modifier.withTable(null), "Table not
specified.");
+ assertInvalid(modifier -> modifier.withTableId(null), "Table id not
specified");
assertInvalid(modifier -> modifier.withDictId(-1), "Provided
dictionary id must be positive but it is '-1'.");
assertInvalid(modifier -> modifier.withDict(null), "Provided
dictionary byte array is null or empty.");
assertInvalid(modifier -> modifier.withDict(new byte[0]), "Provided
dictionary byte array is null or empty.");
@@ -127,6 +132,7 @@ public class CompressionDictionaryDataObjectTest
{
return new CompressionDictionaryDataObject("ks",
"tb",
+ TABLE_ID,
123,
COMPRESSION_DICTIONARY.rawDictionary(),
CompressionDictionary.Kind.ZSTD.name(),
@@ -140,6 +146,7 @@ public class CompressionDictionaryDataObjectTest
{
private String keyspace;
private String table;
+ private String tableId;
private long dictId;
private byte[] dict;
private String kind;
@@ -150,6 +157,7 @@ public class CompressionDictionaryDataObjectTest
{
withKeyspace(from.keyspace);
withTable(from.table);
+ withTableId(from.tableId);
withDictId(from.dictId);
withDict(from.dict);
withKind(from.kind);
@@ -159,7 +167,7 @@ public class CompressionDictionaryDataObjectTest
public CompressionDictionaryDataObject build()
{
- return new CompressionDictionaryDataObject(keyspace, table,
dictId, dict, kind, dictChecksum, dictLength);
+ return new CompressionDictionaryDataObject(keyspace, table,
tableId, dictId, dict, kind, dictChecksum, dictLength);
}
public DataObjectModifier withKeyspace(String keyspace)
@@ -174,6 +182,12 @@ public class CompressionDictionaryDataObjectTest
return this;
}
+ public DataObjectModifier withTableId(String tableId)
+ {
+ this.tableId = tableId;
+ return this;
+ }
+
public DataObjectModifier withDictId(long dictId)
{
this.dictId = dictId;
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryOrphanedTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryOrphanedTest.java
new file mode 100644
index 0000000000..b42da12554
--- /dev/null
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryOrphanedTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compression;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.apache.cassandra.tools.ToolRunner.ToolResult;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class CompressionDictionaryOrphanedTest extends CQLTester
+{
+ private static final String tableName = "mytable";
+
+ @BeforeClass
+ public static void setup() throws Throwable
+ {
+ requireNetwork();
+ startJMXServer();
+ }
+
+ @Test
+ public void testOrphanedCompressionDictionaries()
+ {
+ String firstTableId = createDictTable();
+ trainDictionary();
+ trainDictionary();
+
+ assertDicts(firstTableId);
+
+ // drop that table, so we will have two orphaned
+ dropDictTable();
+ // this will produce orphaned dictionaries - ones without existing
table
+ assertOrphaned(firstTableId);
+
+ // create new table but with same name and train, we will have still
two orphaned, from last table of same name
+ String secondTableId = createDictTable();
+ trainDictionary();
+ trainDictionary();
+
+ // still two from the first run
+ assertOrphaned(firstTableId);
+ // call nodetool, clear orphaned
+ cleanupOrphaned();
+ // verify that orphaned were cleared
+ assertNoOrphaned();
+
+ // now we have the second table only, the first one is dropped, and we
have no orphaned dicts
+ // drop the second table of the same name, that will also produce two
orphaned dics
+ assertDicts(secondTableId);
+
+ dropDictTable();
+ assertOrphaned(secondTableId);
+ cleanupOrphaned();
+ assertNoOrphaned();
+ }
+
+ private String getTableId()
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace(),
tableName);
+ Assert.assertNotNull(cfs);
+ return cfs.metadata.id.toLongString();
+ }
+
+ private String createDictTable()
+ {
+ schemaChange(format("CREATE TABLE %s.%s (id int PRIMARY KEY, data
text)" +
+ " WITH compression = {'class':
'ZstdDictionaryCompressor'}",
+ keyspace(), tableName));
+
+ return getTableId();
+ }
+
+ private void dropDictTable()
+ {
+ schemaChange(format("DROP TABLE %s.%s", keyspace(), tableName));
+ }
+
+ private void assertDicts(String tableId)
+ {
+ List<LightweightCompressionDictionary> dicts =
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries();
+ assertNotNull(dicts);
+ assertEquals(2, dicts.size());
+ assertEquals(tableId, dicts.get(0).tableId);
+ assertEquals(tableId, dicts.get(1).tableId);
+
+ }
+
+ private void assertOrphaned(String tableId)
+ {
+ List<LightweightCompressionDictionary> orphaned =
SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries();
+ assertNotNull(orphaned);
+ assertEquals(2, orphaned.size());
+ assertEquals(tableId, orphaned.get(0).tableId);
+ assertEquals(tableId, orphaned.get(1).tableId);
+
+ ToolResult toolResult = invokeNodetool("compressiondictionary",
"cleanup", "--dry");
+ toolResult.asserts().success();
+ String[] split = toolResult.getStdout().split(System.lineSeparator());
+ // split[0] is the header
+ assertEquals(3, split.length);
+ assertTrue(split[1].contains(tableId));
+ assertTrue(split[2].contains(tableId));
+ }
+
+ private void cleanupOrphaned()
+ {
+ invokeNodetool("compressiondictionary", "cleanup").asserts().success();
+ }
+
+ private void assertNoOrphaned()
+ {
+ ToolResult toolResult = invokeNodetool("compressiondictionary",
"cleanup", "--dry");
+ toolResult.asserts().success();
+ assertTrue(toolResult.getStdout().isBlank());
+ }
+
+ private void trainDictionary()
+ {
+ createSSTables();
+
+ // Test training command with --force since we have limited test data
+ ToolResult result = invokeNodetool("compressiondictionary", "train",
"--force", keyspace(), tableName);
+ result.assertOnCleanExit();
+
+ assertThat(result.getStdout())
+ .as("Should indicate training completed")
+ .contains("Training completed successfully")
+ .contains(keyspace())
+ .contains(tableName);
+ }
+
+ private void createSSTables()
+ {
+ for (int file = 0; file < 10; file++)
+ {
+ int batchSize = 1000;
+ for (int i = 0; i < batchSize; i++)
+ {
+ int index = i + file * batchSize;
+ executeFormattedQuery(format("INSERT INTO %s.%s (id, data)
VALUES (?, ?)", keyspace(), tableName),
+ index, "test data " + index);
+ }
+
+ flush();
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
index e9c5b00b47..e99a318358 100644
---
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
@@ -61,9 +61,9 @@ public class CompressionDictionarySchedulerTest extends
CQLTester
{
String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) " +
"WITH compression = {'class':
'ZstdDictionaryCompressor'}");
- scheduler = new CompressionDictionaryScheduler(KEYSPACE, table, cache,
true);
-
ColumnFamilyStore cfs =
Keyspace.open(keyspace()).getColumnFamilyStore(table);
+ scheduler = new CompressionDictionaryScheduler(KEYSPACE, table,
cfs.metadata.id.toLongString(), cache, true);
+
try (CompressionDictionaryManager manager =
cfs.compressionDictionaryManager())
{
Set<SSTableReader> sstables = new HashSet<>();
@@ -81,9 +81,9 @@ public class CompressionDictionarySchedulerTest extends
CQLTester
{
String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) " +
"WITH compression = {'class':
'ZstdDictionaryCompressor', 'chunk_length_in_kb': '4'}");
- scheduler = new CompressionDictionaryScheduler(KEYSPACE, table, cache,
true);
-
ColumnFamilyStore cfs =
Keyspace.open(keyspace()).getColumnFamilyStore(table);
+ scheduler = new CompressionDictionaryScheduler(KEYSPACE, table,
cfs.metadata.id.toLongString(), cache, true);
+
cfs.disableAutoCompaction();
try (CompressionDictionaryManager manager =
cfs.compressionDictionaryManager())
{
diff --git
a/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
b/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
index 11b0764a1a..2a3fd24862 100644
---
a/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
+++
b/test/unit/org/apache/cassandra/schema/SystemDistributedKeyspaceCompressionDictionaryTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.schema;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
@@ -79,12 +80,12 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
@Test
public void testStoreCompressionDictionary() throws Exception
{
+ String tableID = UUID.randomUUID().toString();
// Store a dictionary
- SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, testDictionary1);
+ SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID, testDictionary1);
// Verify it was stored
- CompressionDictionary retrieved =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE);
+ CompressionDictionary retrieved =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID);
assertThat(retrieved)
.as("Retrieved dictionary should not be null")
@@ -106,15 +107,15 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
}
@Test
- public void testStoreMultipleDictionaries() throws Exception
+ public void testStoreMultipleDictionaries()
{
+ String tableID = UUID.randomUUID().toString();
// Store multiple dictionaries for the same table
- SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, testDictionary1);
- SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, testDictionary2);
+ SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID, testDictionary1);
+ SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID, testDictionary2);
// Should retrieve the latest one (higher ID due to clustering order)
- CompressionDictionary latest =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE);
+ CompressionDictionary latest =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID);
assertThat(latest)
.as("Should retrieve the latest dictionary")
@@ -128,17 +129,18 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
}
@Test
- public void testRetrieveSpecificDictionary() throws Exception
+ public void testRetrieveSpecificDictionary()
{
+ String tableID = UUID.randomUUID().toString();
// Store both dictionaries
- SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, testDictionary1);
- SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, testDictionary2);
+ SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID, testDictionary1);
+ SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID, testDictionary2);
// Retrieve specific dictionary by ID
CompressionDictionary dict1 =
SystemDistributedKeyspace.retrieveCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE, 100L);
+ TEST_KEYSPACE, TEST_TABLE, tableID, 100L);
CompressionDictionary dict2 =
SystemDistributedKeyspace.retrieveCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE, 200L);
+ TEST_KEYSPACE, TEST_TABLE, tableID, 200L);
assertThat(dict1)
.as("Should retrieve dictionary 1")
@@ -165,7 +167,7 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
{
// Try to retrieve dictionary that doesn't exist
CompressionDictionary nonExistent =
SystemDistributedKeyspace.retrieveLatestCompressionDictionary(
- "nonexistent_keyspace", "nonexistent_table");
+ "nonexistent_keyspace", "nonexistent_table", "nonexistingid");
assertThat(nonExistent)
.as("Should return null for non-existent dictionary")
@@ -173,7 +175,7 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
// Try to retrieve specific dictionary that doesn't exist
CompressionDictionary nonExistentById =
SystemDistributedKeyspace.retrieveCompressionDictionary(
- TEST_KEYSPACE, TEST_TABLE, 999L);
+ TEST_KEYSPACE, TEST_TABLE, "nonexistingid",999L);
assertThat(nonExistentById)
.as("Should return null for non-existent dictionary ID")
@@ -181,18 +183,20 @@ public class
SystemDistributedKeyspaceCompressionDictionaryTest extends CQLTeste
}
@Test
- public void testStoredDictionaryIncludesLengthAndChecksum() throws
Exception
+ public void testStoredDictionaryIncludesLengthAndChecksum()
{
+ String tableID = UUID.randomUUID().toString();
// Store a dictionary
- SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, testDictionary1);
+ SystemDistributedKeyspace.storeCompressionDictionary(TEST_KEYSPACE,
TEST_TABLE, tableID, testDictionary1);
// Query the table directly to verify dict_length and dict_checksum
are stored
- String query = String.format("SELECT dict_length, dict_checksum FROM
%s.%s WHERE keyspace_name = '%s' AND table_name = '%s' AND dict_id = %d",
+ String query = String.format("SELECT dict_length, dict_checksum FROM
%s.%s WHERE keyspace_name = '%s' AND table_name = '%s' AND dict_id = %d AND
table_id = '%s'",
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
SystemDistributedKeyspace.COMPRESSION_DICTIONARIES,
TEST_KEYSPACE,
TEST_TABLE,
- testDictionary1.dictId().id);
+ testDictionary1.dictId().id,
+ tableID);
var resultSet = QueryProcessor.executeInternal(query);
diff --git
a/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
index ea4e72801b..f831b34703 100644
---
a/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
+++
b/test/unit/org/apache/cassandra/tools/nodetool/ExportImportListCompressionDictionaryTest.java
@@ -63,7 +63,7 @@ public class ExportImportListCompressionDictionaryTest
extends CQLTester
importDictionary(pair.right);
list(pair.left, firstTable);
- list(pair.left, secondTable);
+ list(export(secondTable, null).left, secondTable);
}
@Test
@@ -87,6 +87,7 @@ public class ExportImportListCompressionDictionaryTest
extends CQLTester
// test non-existing keyspace / table
serializeToJsonFile(new CompressionDictionaryDataObject("abc",
"def",
+ "123",
pair.left.dictId,
pair.left.dict,
pair.left.kind,
@@ -177,6 +178,7 @@ public class ExportImportListCompressionDictionaryTest
extends CQLTester
{
serializeToJsonFile(new
CompressionDictionaryDataObject(pair.left.keyspace,
table,
+
pair.left.tableId,
pair.left.dictId,
pair.left.dict,
pair.left.kind,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]