This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9303ca04de Introduce a check for minimum time to pass to train or
import a compression dictionary from the last one
9303ca04de is described below
commit 9303ca04dec5714aa37723a2ca5cb470fa617efe
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Mon Feb 23 16:40:51 2026 +0100
Introduce a check for minimum time to pass to train or import a compression
dictionary from the last one
patch by Stefan Miklosovic; reviewed by Yifan Cai for CASSANDRA-21179
---
CHANGES.txt | 1 +
.../pages/managing/operating/compression.adoc | 9 +-
.../compression/CompressionDictionaryManager.java | 63 +++++-
.../io/compress/IDictionaryCompressor.java | 29 ++-
.../io/compress/ZstdDictionaryCompressor.java | 21 +-
...CompressionDictionaryTrainingFrequencyTest.java | 242 +++++++++++++++++++++
6 files changed, 349 insertions(+), 16 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9d0aafacdb..5b08d8b08c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Introduce a check for minimum time to pass to train or import a compression
dictionary from the last one (CASSANDRA-21179)
* Allow overriding compaction strategy parameters during startup
(CASSANDRA-21169)
* Introduce created_at column to system_distributed.compression_dictionaries
(CASSANDRA-21178)
* Be able to detect and remove orphaned compression dictionaries
(CASSANDRA-21157)
diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc
b/doc/modules/cassandra/pages/managing/operating/compression.adoc
index 97de5c12c0..134c256934 100644
--- a/doc/modules/cassandra/pages/managing/operating/compression.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc
@@ -302,6 +302,7 @@ These parameters are meant to be configured via CQL for
each respective table if
* `training_max_total_sample_size` (default: `10MiB`): Maximum total size of
sample data to collect for training, approximately 10MB. This parameter is
configured in the
table's compression options for `ZstdDictionaryCompressor`.
* `training_max_dictionary_size` (default: `64KiB`): Maximum size of trained
dictionaries in bytes. Larger dictionaries can capture more patterns but
increase memory overhead. This is a parameter of `ZstdDictionaryCompressor` of
a table, in `compression` section.
+* `training_min_frequency` (default: `0m`): Minimum time which needs to pass
until we can train another compression dictionary. For example, if this
property is set to `1h`, then we can train another dictionary no earlier than 1
hour after the last training was conducted. `0m`, default, means we can train
as frequently as we want. The purpose of this parameter is to prevent excessive
training which might not make sense from operational and performance
perspective. If an operator wants to [...]
Example:
@@ -312,13 +313,13 @@ ALTER TABLE keyspace.table
'class': 'ZstdDictionaryCompressor',
'compression_level': '3',
'training_max_total_sample_size': '20MiB',
- 'training_max_dictionary_size': '128KiB'
+ 'training_max_dictionary_size': '128KiB',
+ 'training_min_frequency': '1d'
};
----
-It is possible to override these training parameters by `nodetool
compressiondictionary train` command as
-explained in the section futher down below. If `train` subcommand do not
override them, CQL parameters are
-taken into account.
+It is possible to override these training parameters by `nodetool
compressiondictionary train` command (except `training_min_frequency` which is
configurable via CQL only) as
+explained in the section further down below. If `train` subcommand do not
override them, CQL parameters are taken into account.
== Other options
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
index 8adefad01d..1207ed034f 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.db.compression;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,20 +35,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.db.ColumnFamilyStore;
import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MBeanWrapper.OnException;
import static java.lang.String.format;
import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MIN_FREQUENCY;
import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME;
import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MIN_FREQUENCY_PARAMETER_NAME;
public class CompressionDictionaryManager implements
CompressionDictionaryManagerMBean,
ICompressionDictionaryCache,
@@ -219,6 +225,12 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
// resolve training config and fail fast when invalid, so we do not
reach logic which would e.g. flush unnecessarily.
CompressionDictionaryTrainingConfig trainingConfig =
createTrainingConfig(parameters);
+ LightweightCompressionDictionary dictionary =
SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(columnFamilyStore.getKeyspaceName(),
+
columnFamilyStore.getTableName(),
+
columnFamilyStore.metadata.id.toLongString());
+
+ checkTrainingFrequency(dictionary);
+
// SSTable-based training: sample from existing SSTables
Set<SSTableReader> sstables = columnFamilyStore.getLiveSSTables();
if (sstables.isEmpty())
@@ -319,10 +331,15 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
CompressionDictionary.DictId dictId = new
CompressionDictionary.DictId(kind, dataObject.dictId);
LightweightCompressionDictionary latestCompressionDictionary =
SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName,
tableName, tableId);
- if (latestCompressionDictionary != null &&
latestCompressionDictionary.dictId.id > dictId.id)
+ if (latestCompressionDictionary != null)
{
- throw new IllegalArgumentException(format("Dictionary to import
has older dictionary id (%s) than the latest compression dictionary (%s) for
table %s.%s",
- dictId.id,
latestCompressionDictionary.dictId.id, keyspaceName, tableName));
+ if (latestCompressionDictionary.dictId.id > dictId.id)
+ {
+ throw new IllegalArgumentException(format("Dictionary to
import has older dictionary id (%s) than the latest compression dictionary (%s)
for table %s.%s",
+ dictId.id,
latestCompressionDictionary.dictId.id, keyspaceName, tableName));
+ }
+
+ checkTrainingFrequency(latestCompressionDictionary);
}
handleNewDictionary(kind.createDictionary(dictId, dataObject.dict,
dataObject.dictChecksum));
@@ -394,6 +411,46 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE);
}
+ private DurationSpec.IntMinutesBound
getCompressionDictionaryMinTrainingFrequency(CompressionParams
compressionParams)
+ {
+ String resolvedValue =
compressionParams.getOtherOptions().getOrDefault(TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
DEFAULT_TRAINING_MIN_FREQUENCY);
+
+ try
+ {
+ return new DurationSpec.IntMinutesBound(resolvedValue);
+ }
+ catch (Throwable t)
+ {
+ throw new IllegalArgumentException(String.format("Invalid value
for %s: %s. Reason: %s",
+
TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
+ resolvedValue,
+ t.getMessage()));
+ }
+ }
+
+ private void checkTrainingFrequency(LightweightCompressionDictionary
lastDictionary)
+ {
+ Instant lastTraining = lastDictionary == null ? null :
lastDictionary.createdAt;
+ DurationSpec.IntMinutesBound minTrainingFrequency =
getCompressionDictionaryMinTrainingFrequency(columnFamilyStore.metadata().params.compression);
+
+ // if there is no dictionary trained so far or min frequency is 0 -
that is we can train as often as we want -
+ // then do not check if we can
+ if (lastTraining != null && minTrainingFrequency.toMinutes() != 0)
+ {
+ Instant now = FBUtilities.now();
+ int minTrainingFrequencyMinutes = minTrainingFrequency.toMinutes();
+ if (lastTraining.isAfter(now.minus(minTrainingFrequencyMinutes,
ChronoUnit.MINUTES)))
+ {
+ Instant nextEarliestTraining =
lastTraining.plus(minTrainingFrequencyMinutes, ChronoUnit.MINUTES);
+ throw new IllegalArgumentException(format("The next training
or importing can occur only at least after %s from the last training which
happened at %s. " +
+ "You can train again
no earlier than at %s.",
+ minTrainingFrequency,
+ lastTraining,
+
nextEarliestTraining));
+ }
+ }
+ }
+
private int internalTrainingParameterResolution(CompressionParams
compressionParams,
String userSuppliedValue,
String parameterName,
diff --git
a/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
b/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
index fd4ce62ea3..9296810575 100644
--- a/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.io.compress;
import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.db.compression.CompressionDictionary;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -41,6 +42,11 @@ public interface IDictionaryCompressor<T extends
CompressionDictionary>
String TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME =
"training_max_total_sample_size";
String DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE = "10MiB";
+ String TRAINING_MIN_FREQUENCY_PARAMETER_NAME = "training_min_frequency";
+ // 0m means there is no limit how often we can train, if this is set to
e.g. 1h, that means
+ // that once we train a dictionary for given table, then we can train
again after at least 1 hour.
+ String DEFAULT_TRAINING_MIN_FREQUENCY = "0m";
+
/**
* Validates value of a parameter for training purposes. The value to
validate should
* be accepted by {@link DataStorageSpec.IntKibibytesBound}. This method
is used upon validation
@@ -49,7 +55,7 @@ public interface IDictionaryCompressor<T extends
CompressionDictionary>
* @param parameterName name of a parameter to validate
* @param resolvedValue value to validate
*/
- static void validateTrainingParameter(String parameterName, String
resolvedValue)
+ static void validateSizeBasedTrainingParameter(String parameterName,
String resolvedValue)
{
try
{
@@ -62,6 +68,27 @@ public interface IDictionaryCompressor<T extends
CompressionDictionary>
}
}
+ /**
+ * Validates value of a parameter for training purposes. The value to
validate should
+ * be accepted by {@link DurationSpec.IntMinutesBound}. This method is
used upon validation of input parameters
+ * in the implementation of dictionary compressor.
+ *
+ * @param parameterName name of a parameter to validate
+ * @param resolvedValue value to validate
+ */
+ static void validateDurationBasedTrainingParameter(String parameterName,
String resolvedValue)
+ {
+ try
+ {
+ new DurationSpec.IntMinutesBound(resolvedValue).toMinutes();
+ }
+ catch (Throwable t)
+ {
+ throw new ConfigurationException(format("Unable to set value to
parameter %s: %s. Reason: %s",
+ parameterName,
resolvedValue, t.getMessage()));
+ }
+ }
+
/**
* Returns a compressor instance configured with the specified compression
dictionary.
* <br>
diff --git
a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
index 29a4131bc1..9223a8f882 100644
--- a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
@@ -39,7 +39,8 @@ import
org.apache.cassandra.db.compression.CompressionDictionary.Kind;
import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
import org.apache.cassandra.utils.concurrent.Ref;
-import static
org.apache.cassandra.io.compress.IDictionaryCompressor.validateTrainingParameter;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.validateDurationBasedTrainingParameter;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.validateSizeBasedTrainingParameter;
public class ZstdDictionaryCompressor extends ZstdCompressorBase implements
ICompressor, IDictionaryCompressor<ZstdCompressionDictionary>
{
@@ -77,12 +78,15 @@ public class ZstdDictionaryCompressor extends
ZstdCompressorBase implements ICom
{
int level = getOrDefaultCompressionLevel(options);
validateCompressionLevel(level);
- validateTrainingParameter(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
-
options.getOrDefault(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
-
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE));
-
validateTrainingParameter(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
-
options.getOrDefault(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
-
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE));
+
validateSizeBasedTrainingParameter(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
+
options.getOrDefault(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
+
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE));
+
validateSizeBasedTrainingParameter(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
+
options.getOrDefault(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
+
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE));
+
validateDurationBasedTrainingParameter(TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
+
options.getOrDefault(TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
+
DEFAULT_TRAINING_MIN_FREQUENCY));
return getOrCreate(level, null);
}
@@ -119,7 +123,8 @@ public class ZstdDictionaryCompressor extends
ZstdCompressorBase implements ICom
{
super(level, Set.of(COMPRESSION_LEVEL_OPTION_NAME,
TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
- TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME));
+ TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
+ TRAINING_MIN_FREQUENCY_PARAMETER_NAME));
this.dictionary = dictionary;
this.dictionaryRef = dictionaryRef;
}
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingFrequencyTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingFrequencyTest.java
new file mode 100644
index 0000000000..c7eb86737f
--- /dev/null
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingFrequencyTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compression;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
+import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject;
+import org.apache.cassandra.io.compress.IDictionaryCompressor;
+import org.apache.cassandra.io.util.File;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.apache.cassandra.tools.ToolRunner;
+import org.apache.cassandra.utils.JsonUtils;
+import org.apache.cassandra.utils.Pair;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CompressionDictionaryTrainingFrequencyTest extends CQLTester
+{
+ private static final String tableName = "mytable";
+
+ @BeforeClass
+ public static void setup() throws Throwable
+ {
+ requireNetwork();
+ startJMXServer();
+ }
+
+ @Test
+ public void testTrainingFrequency() throws Throwable
+ {
+ // we can train twice when no limit is imposed
+ String tableId =
createDictTable(IDictionaryCompressor.DEFAULT_TRAINING_MIN_FREQUENCY);
+ trainDict();
+ trainDict();
+
+ assertDicts(2, tableId);
+
+ alterDictTable("5m");
+
+ assertDictTrainingFails("5m");
+
+ alterDictTable("1m");
+
+ // we can train again as 1 minute from the last training has passed
+ backdateLastDictionaryCreatedAt(tableId);
+ trainDict();
+ assertDicts(3, tableId);
+
+ // resetting back to 0, so we can train whenever we want
+ alterDictTable("0m");
+ trainDict();
+ assertDicts(4, tableId);
+
+ alterDictTable("10m");
+
+ Pair<CompressionDictionaryDataObject, File> export = export();
+ assertFailingImport(export.right);
+
+ alterDictTable("0m");
+ assertSuccessfulImport(export.right);
+ }
+
+ private String getTableId()
+ {
+ ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(keyspace(),
tableName);
+ assertThat(cfs).isNotNull();
+ return cfs.metadata.id.toLongString();
+ }
+
+ private String createDictTable(String frequency)
+ {
+ schemaChange(format("CREATE TABLE %s.%s (id int PRIMARY KEY, data
text)" +
+ " WITH compression = {'class':
'ZstdDictionaryCompressor', '%s': %s}",
+ keyspace(),
+ tableName,
+
IDictionaryCompressor.TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
+ frequency));
+
+ return getTableId();
+ }
+
+ private void alterDictTable(String trainingFrequency)
+ {
+ schemaChange(format("ALTER TABLE %s.%s WITH compression = {'class':
'ZstdDictionaryCompressor', '%s': %s}",
+ keyspace(),
+ tableName,
+
IDictionaryCompressor.TRAINING_MIN_FREQUENCY_PARAMETER_NAME,
+ trainingFrequency));
+ }
+
+ private void assertDicts(int expectedDicts, String tableId)
+ {
+ List<LightweightCompressionDictionary> dicts =
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries();
+ assertThat(dicts).isNotNull().hasSize(expectedDicts);
+ for (int i = 0; i < expectedDicts; i++)
+ assertThat(dicts.get(i).tableId).isEqualTo(tableId);
+ }
+
+ // instead of explicit waiting, just overwrite created_at directly in the
table
+ private void backdateLastDictionaryCreatedAt(String tableId)
+ {
+ List<LightweightCompressionDictionary> dicts =
SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries();
+ assertThat(dicts).isNotNull();
+ assertThat(dicts).isNotEmpty();
+
+ LightweightCompressionDictionary latest = dicts.get(0);
+ long pastTimeMillis = Instant.now().minus(2,
ChronoUnit.MINUTES).toEpochMilli();
+
+ execute(format("UPDATE %s.%s SET created_at = %d WHERE keyspace_name =
'%s' AND table_name = '%s' AND table_id = '%s' AND dict_id = %d",
+ SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
+ SystemDistributedKeyspace.COMPRESSION_DICTIONARIES,
+ pastTimeMillis,
+ keyspace(),
+ tableName,
+ tableId,
+ latest.dictId.id));
+ }
+
+ private void trainDict()
+ {
+ createSSTables();
+
+ // Test training command with --force since we have limited test data
+ ToolRunner.ToolResult result = invokeNodetool("compressiondictionary",
"train", "--force", keyspace(), tableName);
+ result.assertOnCleanExit();
+
+ assertThat(result.getStdout())
+ .as("Should indicate training completed")
+ .contains("Training completed successfully")
+ .contains(keyspace())
+ .contains(tableName);
+ }
+
+ private void assertDictTrainingFails(String frequency)
+ {
+ createSSTables();
+
+ // Test training command with --force since we have limited test data
+ ToolRunner.ToolResult result = invokeNodetool("compressiondictionary",
"train", "--force", keyspace(), tableName);
+ assertThat(result.getExitCode()).isEqualTo(1);
+
+ assertThat(result.getStderr())
+ .as("Should indicate training can not be triggered")
+ .contains("The next training or importing can occur only at least
after " + frequency + " from the last training which happened");
+
+ String failingMessage = Arrays.stream(result.getStderr()
+
.split(System.lineSeparator()))
+ .filter(p -> p.contains("The next
training or importing can occur only at least"))
+ .findFirst()
+ .orElseThrow(() -> new
RuntimeException("Unable to find failing message"));
+
+ String pattern = "Failed to trigger training: The next training or
importing can occur only at least after " +
+ "(.*) from the last training which happened at (.*).
" +
+ "You can train again no earlier than at (.*).";
+ Matcher matcher = Pattern.compile(pattern).matcher(failingMessage);
+
+ assertThat(matcher.matches()).isTrue();
+
+ DurationSpec.IntMinutesBound frequencySpec = new
DurationSpec.IntMinutesBound(matcher.group(1));
+ Instant lastTraining = Instant.parse(matcher.group(2));
+ Instant earliestNextTraining = Instant.parse(matcher.group(3));
+
+ assertThat(earliestNextTraining).isAfter(lastTraining);
+ assertThat(earliestNextTraining.minus(frequencySpec.toMinutes(),
ChronoUnit.MINUTES)).isAfterOrEqualTo(lastTraining);
+ }
+
+ private void assertFailingImport(File file)
+ {
+ ToolRunner.ToolResult result = invokeNodetool("compressiondictionary",
"import", file.absolutePath());
+ assertThat(result.getExitCode()).isEqualTo(1);
+ }
+
+ private void assertSuccessfulImport(File file)
+ {
+ ToolRunner.ToolResult result = invokeNodetool("compressiondictionary",
"import", file.absolutePath());
+ result.assertOnCleanExit();
+ }
+
+ private void createSSTables()
+ {
+ for (int file = 0; file < 10; file++)
+ {
+ int batchSize = 1000;
+ for (int i = 0; i < batchSize; i++)
+ {
+ int index = i + file * batchSize;
+ executeFormattedQuery(format("INSERT INTO %s.%s (id, data)
VALUES (?, ?)", keyspace(), tableName),
+ index, "test data " + index);
+ }
+
+ flush();
+ }
+ }
+
+ private Pair<CompressionDictionaryDataObject, File> export() throws
Throwable
+ {
+ File dictionaryFile = FileUtils.createTempFile("zstd-dictionary-",
".dict");
+ ToolRunner.ToolResult result;
+
+ result = invokeNodetool("compressiondictionary", "export", keyspace(),
tableName, dictionaryFile.absolutePath());
+ result.assertOnCleanExit();
+
+ CompressionDictionaryDataObject dataObject =
JsonUtils.deserializeFromJsonFile(CompressionDictionaryDataObject.class,
dictionaryFile);
+
+ assertThat(dictionaryFile.exists()).isTrue();
+ assertThat(dictionaryFile.length()).isGreaterThan(0);
+
+ return Pair.create(dataObject, dictionaryFile);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]