http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/CachingParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CachingParams.java b/src/java/org/apache/cassandra/schema/CachingParams.java new file mode 100644 index 0000000..2b5ab7c --- /dev/null +++ b/src/java/org/apache/cassandra/schema/CachingParams.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.StringUtils; + +import org.apache.cassandra.exceptions.ConfigurationException; + +import static java.lang.String.format; + +// CQL: {'keys' : 'ALL'|'NONE', 'rows_per_partition': '200'|'NONE'|'ALL'} +public final class CachingParams +{ + public enum Option + { + KEYS, + ROWS_PER_PARTITION; + + @Override + public String toString() + { + return name().toLowerCase(); + } + } + + private static final String ALL = "ALL"; + private static final String NONE = "NONE"; + + static final boolean DEFAULT_CACHE_KEYS = true; + static final int DEFAULT_ROWS_PER_PARTITION_TO_CACHE = 0; + + public static final CachingParams CACHE_NOTHING = new CachingParams(false, 0); + public static final CachingParams CACHE_KEYS = new CachingParams(true, 0); + public static final CachingParams CACHE_EVERYTHING = new CachingParams(true, Integer.MAX_VALUE); + + static final CachingParams DEFAULT = new CachingParams(DEFAULT_CACHE_KEYS, DEFAULT_ROWS_PER_PARTITION_TO_CACHE); + + final boolean cacheKeys; + final int rowsPerPartitionToCache; + + public CachingParams(boolean cacheKeys, int rowsPerPartitionToCache) + { + this.cacheKeys = cacheKeys; + this.rowsPerPartitionToCache = rowsPerPartitionToCache; + } + + public boolean cacheKeys() + { + return cacheKeys; + } + + public boolean cacheRows() + { + return rowsPerPartitionToCache > 0; + } + + public boolean cacheAllRows() + { + return rowsPerPartitionToCache == Integer.MAX_VALUE; + } + + public int rowsPerPartitionToCache() + { + return rowsPerPartitionToCache; + } + + public static CachingParams fromMap(Map<String, String> map) + { + Map<String, String> copy = new HashMap<>(map); + + String keys = copy.remove(Option.KEYS.toString()); + boolean cacheKeys = keys != null && keysFromString(keys); + + String rows = copy.remove(Option.ROWS_PER_PARTITION.toString()); + int rowsPerPartitionToCache = rows == null + ? 0 + : rowsPerPartitionFromString(rows); + + if (!copy.isEmpty()) + { + throw new ConfigurationException(format("Invalid caching sub-options %s: only '%s' and '%s' are allowed", + copy.keySet(), + Option.KEYS, + Option.ROWS_PER_PARTITION)); + } + + return new CachingParams(cacheKeys, rowsPerPartitionToCache); + } + + public Map<String, String> asMap() + { + return ImmutableMap.of(Option.KEYS.toString(), + keysAsString(), + Option.ROWS_PER_PARTITION.toString(), + rowsPerPartitionAsString()); + } + + private static boolean keysFromString(String value) + { + if (value.equalsIgnoreCase(ALL)) + return true; + + if (value.equalsIgnoreCase(NONE)) + return false; + + throw new ConfigurationException(format("Invalid value '%s' for caching sub-option '%s': only '%s' and '%s' are allowed", + value, + Option.KEYS, + ALL, + NONE)); + } + + String keysAsString() + { + return cacheKeys ? ALL : NONE; + } + + private static int rowsPerPartitionFromString(String value) + { + if (value.equalsIgnoreCase(ALL)) + return Integer.MAX_VALUE; + + if (value.equalsIgnoreCase(NONE)) + return 0; + + if (StringUtils.isNumeric(value)) + return Integer.parseInt(value); + + throw new ConfigurationException(format("Invalid value '%s' for caching sub-option '%s':" + + " only '%s', '%s', and integer values are allowed", + value, + Option.ROWS_PER_PARTITION, + ALL, + NONE)); + } + + String rowsPerPartitionAsString() + { + if (rowsPerPartitionToCache == 0) + return NONE; + else if (rowsPerPartitionToCache == Integer.MAX_VALUE) + return ALL; + else + return Integer.toString(rowsPerPartitionToCache); + } + + @Override + public String toString() + { + return format("{'%s' : '%s', '%s' : '%s'}", + Option.KEYS, + keysAsString(), + Option.ROWS_PER_PARTITION, + rowsPerPartitionAsString()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof CachingParams)) + return false; + + CachingParams c = (CachingParams) o; + + return cacheKeys == c.cacheKeys && rowsPerPartitionToCache == c.rowsPerPartitionToCache; + } + + @Override + public int hashCode() + { + return Objects.hashCode(cacheKeys, rowsPerPartitionToCache); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/CompactionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CompactionParams.java b/src/java/org/apache/cassandra/schema/CompactionParams.java new file mode 100644 index 0000000..720efa3 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/CompactionParams.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; +import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.utils.FBUtilities; + +import static java.lang.String.format; + +public final class CompactionParams +{ + private static final Logger logger = LoggerFactory.getLogger(CompactionParams.class); + + public enum Option + { + CLASS, + ENABLED, + MIN_THRESHOLD, + MAX_THRESHOLD; + + @Override + public String toString() + { + return name().toLowerCase(); + } + } + + public static final int DEFAULT_MIN_THRESHOLD = 4; + public static final int DEFAULT_MAX_THRESHOLD = 32; + + public static final boolean DEFAULT_ENABLED = true; + + public static final Map<String, String> DEFAULT_THRESHOLDS = + ImmutableMap.of(Option.MIN_THRESHOLD.toString(), Integer.toString(DEFAULT_MIN_THRESHOLD), + Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD)); + + public static final CompactionParams DEFAULT = + new CompactionParams(SizeTieredCompactionStrategy.class, DEFAULT_THRESHOLDS, DEFAULT_ENABLED); + + private final Class<? extends AbstractCompactionStrategy> klass; + private final ImmutableMap<String, String> options; + private final boolean isEnabled; + + private CompactionParams(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options, boolean isEnabled) + { + this.klass = klass; + this.options = ImmutableMap.copyOf(options); + this.isEnabled = isEnabled; + } + + public static CompactionParams create(Class<? extends AbstractCompactionStrategy> klass, Map<String, String> options) + { + boolean isEnabled = options.containsKey(Option.ENABLED.toString()) + ? Boolean.parseBoolean(options.get(Option.ENABLED.toString())) + : DEFAULT_ENABLED; + + Map<String, String> allOptions = new HashMap<>(options); + if (supportsThresholdParams(klass)) + { + allOptions.putIfAbsent(Option.MIN_THRESHOLD.toString(), Integer.toString(DEFAULT_MIN_THRESHOLD)); + allOptions.putIfAbsent(Option.MAX_THRESHOLD.toString(), Integer.toString(DEFAULT_MAX_THRESHOLD)); + } + + return new CompactionParams(klass, allOptions, isEnabled); + } + + public static CompactionParams scts(Map<String, String> options) + { + return create(SizeTieredCompactionStrategy.class, options); + } + + public static CompactionParams lcs(Map<String, String> options) + { + return create(LeveledCompactionStrategy.class, options); + } + + public int minCompactionThreshold() + { + String threshold = options.get(Option.MIN_THRESHOLD.toString()); + return threshold == null + ? DEFAULT_MIN_THRESHOLD + : Integer.parseInt(threshold); + } + + public int maxCompactionThreshold() + { + String threshold = options.get(Option.MAX_THRESHOLD.toString()); + return threshold == null + ? DEFAULT_MAX_THRESHOLD + : Integer.parseInt(threshold); + } + + public void validate() + { + try + { + Map<?, ?> unknownOptions = (Map) klass.getMethod("validateOptions", Map.class).invoke(null, options); + if (!unknownOptions.isEmpty()) + { + throw new ConfigurationException(format("Properties specified %s are not understood by %s", + unknownOptions.keySet(), + klass.getSimpleName())); + } + } + catch (NoSuchMethodException e) + { + logger.warn("Compaction strategy {} does not have a static validateOptions method. Validation ignored", + klass.getName()); + } + catch (InvocationTargetException e) + { + if (e.getTargetException() instanceof ConfigurationException) + throw (ConfigurationException) e.getTargetException(); + + Throwable cause = e.getCause() == null + ? e + : e.getCause(); + + throw new ConfigurationException(format("%s.validateOptions() threw an error: %s %s", + klass.getName(), + cause.getClass().getName(), + cause.getMessage()), + e); + } + catch (IllegalAccessException e) + { + throw new ConfigurationException("Cannot access method validateOptions in " + klass.getName(), e); + } + + String minThreshold = options.get(Option.MIN_THRESHOLD.toString()); + if (minThreshold != null && !StringUtils.isNumeric(minThreshold)) + { + throw new ConfigurationException(format("Invalid value %s for '%s' compaction sub-option - must be an integer", + minThreshold, + Option.MIN_THRESHOLD)); + } + + String maxThreshold = options.get(Option.MAX_THRESHOLD.toString()); + if (maxThreshold != null && !StringUtils.isNumeric(maxThreshold)) + { + throw new ConfigurationException(format("Invalid value %s for '%s' compaction sub-option - must be an integer", + maxThreshold, + Option.MAX_THRESHOLD)); + } + + if (minCompactionThreshold() <= 0 || maxCompactionThreshold() <= 0) + { + throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been removed," + + " set the compaction option 'enabled' to false instead."); + } + + if (minCompactionThreshold() <= 1) + { + throw new ConfigurationException(format("Min compaction threshold cannot be less than 2 (got %d)", + minCompactionThreshold())); + } + + if (minCompactionThreshold() > maxCompactionThreshold()) + { + throw new ConfigurationException(format("Min compaction threshold (got %d) cannot be greater than max compaction threshold (got %d)", + minCompactionThreshold(), + maxCompactionThreshold())); + } + } + + double defaultBloomFilterFbChance() + { + return klass.equals(LeveledCompactionStrategy.class) ? 0.1 : 0.01; + } + + public Class<? extends AbstractCompactionStrategy> klass() + { + return klass; + } + + /** + * All strategy options - excluding 'class'. + */ + public Map<String, String> options() + { + return options; + } + + public boolean isEnabled() + { + return isEnabled; + } + + public static CompactionParams fromMap(Map<String, String> map) + { + Map<String, String> options = new HashMap<>(map); + + String className = options.remove(Option.CLASS.toString()); + if (className == null) + { + throw new ConfigurationException(format("Missing sub-option '%s' for the '%s' option", + Option.CLASS, + TableParams.Option.COMPACTION)); + } + + return create(classFromName(className), options); + } + + private static Class<? extends AbstractCompactionStrategy> classFromName(String name) + { + String className = name.contains(".") + ? name + : "org.apache.cassandra.db.compaction." + name; + Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy"); + + if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass)) + { + throw new ConfigurationException(format("Compaction strategy class %s is not derived from AbstractReplicationStrategy", + className)); + } + + return strategyClass; + } + + /* + * LCS doesn't, STCS and DTCS do + */ + @SuppressWarnings("unchecked") + public static boolean supportsThresholdParams(Class<? extends AbstractCompactionStrategy> klass) + { + try + { + Map<String, String> unrecognizedOptions = + (Map<String, String>) klass.getMethod("validateOptions", Map.class) + .invoke(null, DEFAULT_THRESHOLDS); + + return unrecognizedOptions.isEmpty(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public Map<String, String> asMap() + { + Map<String, String> map = new HashMap<>(options()); + map.put(Option.CLASS.toString(), klass.getName()); + return map; + } + + @Override + public String toString() + { + return MoreObjects.toStringHelper(this) + .add("class", klass.getName()) + .add("options", options) + .toString(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof CompactionParams)) + return false; + + CompactionParams cp = (CompactionParams) o; + + return klass.equals(cp.klass) && options.equals(cp.options); + } + + @Override + public int hashCode() + { + return Objects.hash(klass, options); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/CompressionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java new file mode 100644 index 0000000..a73fcd1 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.compress.*; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +import static java.lang.String.format; + +@SuppressWarnings("deprecation") +public final class CompressionParams +{ + private static final Logger logger = LoggerFactory.getLogger(CompressionParams.class); + + private static volatile boolean hasLoggedSsTableCompressionWarning; + private static volatile boolean hasLoggedChunkLengthWarning; + + public static final int DEFAULT_CHUNK_LENGTH = 65536; + public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0; + public static final IVersionedSerializer<CompressionParams> serializer = new Serializer(); + + public static final String CLASS = "class"; + public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb"; + public static final String ENABLED = "enabled"; + + public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.instance, + DEFAULT_CHUNK_LENGTH, + Collections.emptyMap()); + + @Deprecated public static final String SSTABLE_COMPRESSION = "sstable_compression"; + @Deprecated public static final String CHUNK_LENGTH_KB = "chunk_length_kb"; + + public static final String CRC_CHECK_CHANCE = "crc_check_chance"; + + public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE); + + private final ICompressor sstableCompressor; + private final Integer chunkLength; + private volatile double crcCheckChance; + private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be use by the compressor + private CFMetaData liveMetadata; + + public static CompressionParams fromMap(Map<String, String> opts) + { + Map<String, String> options = copyOptions(opts); + + String sstableCompressionClass; + + if (!opts.isEmpty() && isEnabled(opts) && !containsSstableCompressionClass(opts)) + throw new ConfigurationException(format("Missing sub-option '%s' for the 'compression' option.", CLASS)); + + if (!removeEnabled(options)) + { + sstableCompressionClass = null; + + if (!options.isEmpty()) + throw new ConfigurationException(format("If the '%s' option is set to false no other options must be specified", ENABLED)); + } + else + { + sstableCompressionClass = removeSstableCompressionClass(options); + } + + Integer chunkLength = removeChunkLength(options); + + CompressionParams cp = new CompressionParams(sstableCompressionClass, chunkLength, options); + cp.validate(); + + return cp; + } + + public Class<? extends ICompressor> klass() + { + return sstableCompressor.getClass(); + } + + public static CompressionParams noCompression() + { + return new CompressionParams((ICompressor) null, DEFAULT_CHUNK_LENGTH, Collections.emptyMap()); + } + + public static CompressionParams snappy() + { + return snappy(null); + } + + public static CompressionParams snappy(Integer chunkLength) + { + return new CompressionParams(SnappyCompressor.instance, chunkLength, Collections.emptyMap()); + } + + public static CompressionParams deflate() + { + return deflate(null); + } + + public static CompressionParams deflate(Integer chunkLength) + { + return new CompressionParams(DeflateCompressor.instance, chunkLength, Collections.emptyMap()); + } + + public static CompressionParams lz4() + { + return lz4(null); + } + + public static CompressionParams lz4(Integer chunkLength) + { + return new CompressionParams(LZ4Compressor.instance, chunkLength, Collections.emptyMap()); + } + + public CompressionParams(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException + { + this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, otherOptions); + } + + private CompressionParams(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException + { + this.sstableCompressor = sstableCompressor; + this.chunkLength = chunkLength; + this.otherOptions = ImmutableMap.copyOf(otherOptions); + String chance = otherOptions.get(CRC_CHECK_CHANCE); + this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance); + } + + public CompressionParams copy() + { + return new CompressionParams(sstableCompressor, chunkLength, otherOptions); + } + + public void setLiveMetadata(final CFMetaData liveMetadata) + { + if (liveMetadata == null) + return; + + this.liveMetadata = liveMetadata; + } + + public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException + { + validateCrcCheckChance(crcCheckChance); + this.crcCheckChance = crcCheckChance; + + if (liveMetadata != null && this != liveMetadata.params.compression) + liveMetadata.params.compression.setCrcCheckChance(crcCheckChance); + } + + /** + * Checks if compression is enabled. + * @return {@code true} if compression is enabled, {@code false} otherwise. + */ + public boolean isEnabled() + { + return sstableCompressor != null; + } + + /** + * Returns the SSTable compressor. + * @return the SSTable compressor or {@code null} if compression is disabled. + */ + public ICompressor getSstableCompressor() + { + return sstableCompressor; + } + + public ImmutableMap<String, String> getOtherOptions() + { + return otherOptions; + } + + public double getCrcCheckChance() + { + return liveMetadata == null ? this.crcCheckChance : liveMetadata.params.compression.crcCheckChance; + } + + private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException + { + try + { + double chance = Double.parseDouble(crcCheckChance); + validateCrcCheckChance(chance); + return chance; + } + catch (NumberFormatException e) + { + throw new ConfigurationException("crc_check_chance should be a double"); + } + } + + private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException + { + if (crcCheckChance < 0.0d || crcCheckChance > 1.0d) + throw new ConfigurationException("crc_check_chance should be between 0.0 and 1.0"); + } + + public int chunkLength() + { + return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength; + } + + private static Class<?> parseCompressorClass(String className) throws ConfigurationException + { + if (className == null || className.isEmpty()) + return null; + + className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className; + try + { + return Class.forName(className); + } + catch (Exception e) + { + throw new ConfigurationException("Could not create Compression for type " + className, e); + } + } + + private static ICompressor createCompressor(Class<?> compressorClass, Map<String, String> compressionOptions) throws ConfigurationException + { + if (compressorClass == null) + { + if (!compressionOptions.isEmpty()) + throw new ConfigurationException("Unknown compression options (" + compressionOptions.keySet() + ") since no compression class found"); + return null; + } + + try + { + Method method = compressorClass.getMethod("create", Map.class); + ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions); + // Check for unknown options + AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS); + for (String provided : compressionOptions.keySet()) + if (!supportedOpts.contains(provided)) + throw new ConfigurationException("Unknown compression options " + provided); + return compressor; + } + catch (NoSuchMethodException e) + { + throw new ConfigurationException("create method not found", e); + } + catch (SecurityException e) + { + throw new ConfigurationException("Access forbiden", e); + } + catch (IllegalAccessException e) + { + throw new ConfigurationException("Cannot access method create in " + compressorClass.getName(), e); + } + catch (InvocationTargetException e) + { + if (e.getTargetException() instanceof ConfigurationException) + throw (ConfigurationException) e.getTargetException(); + + Throwable cause = e.getCause() == null + ? e + : e.getCause(); + + throw new ConfigurationException(format("%s.create() threw an error: %s %s", + compressorClass.getSimpleName(), + cause.getClass().getName(), + cause.getMessage()), + e); + } + catch (ExceptionInInitializerError e) + { + throw new ConfigurationException("Cannot initialize class " + compressorClass.getName()); + } + } + + public static ICompressor createCompressor(ParameterizedClass compression) throws ConfigurationException { + return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters)); + } + + private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co) + { + if (co == null || co.isEmpty()) + return Collections.<String, String>emptyMap(); + + Map<String, String> compressionOptions = new HashMap<>(); + for (Map.Entry<? extends CharSequence, ? extends CharSequence> entry : co.entrySet()) + compressionOptions.put(entry.getKey().toString(), entry.getValue().toString()); + return compressionOptions; + } + + /** + * Parse the chunk length (in KB) and returns it as bytes. + * + * @param chLengthKB the length of the chunk to parse + * @return the chunk length in bytes + * @throws ConfigurationException if the chunk size is too large + */ + private static Integer parseChunkLength(String chLengthKB) throws ConfigurationException + { + if (chLengthKB == null) + return null; + + try + { + int parsed = Integer.parseInt(chLengthKB); + if (parsed > Integer.MAX_VALUE / 1024) + throw new ConfigurationException(format("Value of %s is too large (%s)", CHUNK_LENGTH_IN_KB,parsed)); + return 1024 * parsed; + } + catch (NumberFormatException e) + { + throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_IN_KB, e); + } + } + + /** + * Removes the chunk length option from the specified set of option. + * + * @param options the options + * @return the chunk length value + */ + private static Integer removeChunkLength(Map<String, String> options) + { + if (options.containsKey(CHUNK_LENGTH_IN_KB)) + { + if (options.containsKey(CHUNK_LENGTH_KB)) + { + throw new ConfigurationException(format("The '%s' option must not be used if the chunk length is already specified by the '%s' option", + CHUNK_LENGTH_KB, + CHUNK_LENGTH_IN_KB)); + } + + return parseChunkLength(options.remove(CHUNK_LENGTH_IN_KB)); + } + + if (options.containsKey(CHUNK_LENGTH_KB)) + { + if (options.containsKey(CHUNK_LENGTH_KB) && !hasLoggedChunkLengthWarning) + { + hasLoggedChunkLengthWarning = true; + logger.warn(format("The %s option has been deprecated. You should use %s instead", + CHUNK_LENGTH_KB, + CHUNK_LENGTH_IN_KB)); + } + + return parseChunkLength(options.remove(CHUNK_LENGTH_KB)); + } + + return null; + } + + /** + * Returns {@code true} if the specified options contains the name of the compression class to be used, + * {@code false} otherwise. + * + * @param options the options + * @return {@code true} if the specified options contains the name of the compression class to be used, + * {@code false} otherwise. + */ + public static boolean containsSstableCompressionClass(Map<String, String> options) + { + return options.containsKey(CLASS) || options.containsKey(SSTABLE_COMPRESSION); + } + + /** + * Removes the option specifying the name of the compression class + * + * @param options the options + * @return the name of the compression class + */ + private static String removeSstableCompressionClass(Map<String, String> options) + { + if (options.containsKey(CLASS)) + { + if (options.containsKey(SSTABLE_COMPRESSION)) + throw new ConfigurationException(format("The '%s' option must not be used if the compression algorithm is already specified by the '%s' option", + SSTABLE_COMPRESSION, + CLASS)); + + String clazz = options.remove(CLASS); + if (clazz.isEmpty()) + throw new ConfigurationException(format("The '%s' option must not be empty. To disable compression use 'enabled' : false", CLASS)); + + return clazz; + } + + if (options.containsKey(SSTABLE_COMPRESSION) && !hasLoggedSsTableCompressionWarning) + { + hasLoggedSsTableCompressionWarning = true; + logger.warn(format("The %s option has been deprecated. You should use %s instead", + SSTABLE_COMPRESSION, + CLASS)); + } + + return options.remove(SSTABLE_COMPRESSION); + } + + /** + * Returns {@code true} if the options contains the {@code enabled} option and that its value is + * {@code true}, otherwise returns {@code false}. + * + * @param options the options + * @return {@code true} if the options contains the {@code enabled} option and that its value is + * {@code true}, otherwise returns {@code false}. + */ + public static boolean isEnabled(Map<String, String> options) + { + String enabled = options.get(ENABLED); + return enabled == null || Boolean.parseBoolean(enabled); + } + + /** + * Removes the {@code enabled} option from the specified options. + * + * @param options the options + * @return the value of the {@code enabled} option + */ + private static boolean removeEnabled(Map<String, String> options) + { + String enabled = options.remove(ENABLED); + return enabled == null || Boolean.parseBoolean(enabled); + } + + // chunkLength must be a power of 2 because we assume so when + // computing the chunk number from an uncompressed file offset (see + // CompressedRandomAccessReader.decompresseChunk()) + public void validate() throws ConfigurationException + { + // if chunk length was not set (chunkLength == null), this is fine, default will be used + if (chunkLength != null) + { + if (chunkLength <= 0) + throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_IN_KB); + + int c = chunkLength; + boolean found = false; + while (c != 0) + { + if ((c & 0x01) != 0) + { + if (found) + throw new ConfigurationException(CHUNK_LENGTH_IN_KB + " must be a power of 2"); + else + found = true; + } + c >>= 1; + } + } + + validateCrcCheckChance(crcCheckChance); + } + + public Map<String, String> asMap() + { + if (!isEnabled()) + return Collections.singletonMap(ENABLED, "false"); + + Map<String, String> options = new HashMap<>(otherOptions); + options.put(CLASS, sstableCompressor.getClass().getName()); + options.put(CHUNK_LENGTH_IN_KB, chunkLengthInKB()); + + return options; + } + + public String chunkLengthInKB() + { + return String.valueOf(chunkLength() / 1024); + } + + @Override + public boolean equals(Object obj) + { + if (obj == this) + { + return true; + } + else if (obj == null || obj.getClass() != getClass()) + { + return false; + } + + CompressionParams cp = (CompressionParams) obj; + return new EqualsBuilder() + .append(sstableCompressor, cp.sstableCompressor) + .append(chunkLength(), cp.chunkLength()) + .append(otherOptions, cp.otherOptions) + .isEquals(); + } + + @Override + public int hashCode() + { + return new HashCodeBuilder(29, 1597) + .append(sstableCompressor) + .append(chunkLength()) + .append(otherOptions) + .toHashCode(); + } + + static class Serializer implements IVersionedSerializer<CompressionParams> + { + public void serialize(CompressionParams parameters, DataOutputPlus out, int version) throws IOException + { + out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName()); + out.writeInt(parameters.otherOptions.size()); + for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet()) + { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + out.writeInt(parameters.chunkLength()); + } + + public CompressionParams deserialize(DataInputPlus in, int version) throws IOException + { + String compressorName = in.readUTF(); + int optionCount = in.readInt(); + Map<String, String> options = new HashMap<>(); + for (int i = 0; i < optionCount; ++i) + { + String key = in.readUTF(); + String value = in.readUTF(); + options.put(key, value); + } + int chunkLength = in.readInt(); + CompressionParams parameters; + try + { + parameters = new CompressionParams(compressorName, chunkLength, options); + } + catch (ConfigurationException e) + { + throw new RuntimeException("Cannot create CompressionParams for parameters", e); + } + return parameters; + } + + public long serializedSize(CompressionParams parameters, int version) + { + long size = TypeSizes.sizeof(parameters.sstableCompressor.getClass().getSimpleName()); + size += TypeSizes.sizeof(parameters.otherOptions.size()); + for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet()) + { + size += TypeSizes.sizeof(entry.getKey()); + size += TypeSizes.sizeof(entry.getValue()); + } + size += TypeSizes.sizeof(parameters.chunkLength()); + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/KeyspaceParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java b/src/java/org/apache/cassandra/schema/KeyspaceParams.java index a8de2bd..6cdf27f 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java @@ -17,15 +17,9 @@ */ package org.apache.cassandra.schema; -import java.util.HashMap; import java.util.Map; import com.google.common.base.Objects; -import com.google.common.collect.ImmutableMap; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.locator.*; -import org.apache.cassandra.service.StorageService; /** * An immutable class representing keyspace parameters (durability and replication). @@ -47,9 +41,9 @@ public final class KeyspaceParams } public final boolean durableWrites; - public final Replication replication; + public final ReplicationParams replication; - public KeyspaceParams(boolean durableWrites, Replication replication) + public KeyspaceParams(boolean durableWrites, ReplicationParams replication) { this.durableWrites = durableWrites; this.replication = replication; @@ -57,22 +51,22 @@ public final class KeyspaceParams public static KeyspaceParams create(boolean durableWrites, Map<String, String> replication) { - return new KeyspaceParams(durableWrites, Replication.fromMap(replication)); + return new KeyspaceParams(durableWrites, ReplicationParams.fromMap(replication)); } public static KeyspaceParams local() { - return new KeyspaceParams(true, Replication.local()); + return new KeyspaceParams(true, ReplicationParams.local()); } public static KeyspaceParams simple(int replicationFactor) { - return new KeyspaceParams(true, Replication.simple(replicationFactor)); + return new KeyspaceParams(true, ReplicationParams.simple(replicationFactor)); } public static KeyspaceParams simpleTransient(int replicationFactor) { - return new KeyspaceParams(false, Replication.simple(replicationFactor)); + return new KeyspaceParams(false, ReplicationParams.simple(replicationFactor)); } public void validate(String name) @@ -108,81 +102,4 @@ public final class KeyspaceParams .add(Option.REPLICATION.toString(), replication) .toString(); } - - public static final class Replication - { - public static String CLASS = "class"; - - public final Class<? extends AbstractReplicationStrategy> klass; - public final ImmutableMap<String, String> options; - - private Replication(Class<? extends AbstractReplicationStrategy> klass, Map<String, String> options) - { - this.klass = klass; - this.options = ImmutableMap.copyOf(options); - } - - private static Replication local() - { - return new Replication(LocalStrategy.class, ImmutableMap.of()); - } - - private static Replication simple(int replicationFactor) - { - return new Replication(SimpleStrategy.class, ImmutableMap.of("replication_factor", Integer.toString(replicationFactor))); - } - - public void validate(String name) - { - // Attempt to instantiate the ARS, which will throw a ConfigurationException if the options aren't valid. - TokenMetadata tmd = StorageService.instance.getTokenMetadata(); - IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch(); - AbstractReplicationStrategy.validateReplicationStrategy(name, klass, tmd, eps, options); - } - - public static Replication fromMap(Map<String, String> map) - { - Map<String, String> options = new HashMap<>(map); - String className = options.remove(CLASS); - Class<? extends AbstractReplicationStrategy> klass = AbstractReplicationStrategy.getClass(className); - return new Replication(klass, options); - } - - public Map<String, String> asMap() - { - Map<String, String> map = new HashMap<>(options); - map.put(CLASS, klass.getName()); - return map; - } - - @Override - public boolean equals(Object o) - { - if (this == o) - return true; - - if (!(o instanceof Replication)) - return false; - - Replication r = (Replication) o; - - return klass.equals(r.klass) && options.equals(r.options); - } - - @Override - public int hashCode() - { - return Objects.hashCode(klass, options); - } - - @Override - public String toString() - { - Objects.ToStringHelper helper = Objects.toStringHelper(this); - helper.add(CLASS, klass.getName()); - for (Map.Entry<String, String> entry : options.entrySet()) - helper.add(entry.getKey(), entry.getValue()); - return helper.toString(); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java index 41da481..8dac03b 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java @@ -26,18 +26,17 @@ import com.google.common.collect.ImmutableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.FunctionName; import org.apache.cassandra.cql3.functions.UDAggregate; import org.apache.cassandra.cql3.functions.UDFunction; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -189,7 +188,7 @@ public final class LegacySchemaMigrator Map<String, String> replication = new HashMap<>(); replication.putAll(fromJsonMap(row.getString("strategy_options"))); - replication.put(KeyspaceParams.Replication.CLASS, row.getString("strategy_class")); + replication.put(ReplicationParams.CLASS, row.getString("strategy_class")); return KeyspaceParams.create(durableWrites, replication); } @@ -317,41 +316,86 @@ public final class LegacySchemaMigrator columnDefs, DatabaseDescriptor.getPartitioner()); - cfm.readRepairChance(tableRow.getDouble("read_repair_chance")); - cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance")); - cfm.gcGraceSeconds(tableRow.getInt("gc_grace_seconds")); - cfm.minCompactionThreshold(tableRow.getInt("min_compaction_threshold")); - cfm.maxCompactionThreshold(tableRow.getInt("max_compaction_threshold")); - if (tableRow.has("comment")) - cfm.comment(tableRow.getString("comment")); - if (tableRow.has("memtable_flush_period_in_ms")) - cfm.memtableFlushPeriod(tableRow.getInt("memtable_flush_period_in_ms")); - cfm.caching(CachingOptions.fromString(tableRow.getString("caching"))); - if (tableRow.has("default_time_to_live")) - cfm.defaultTimeToLive(tableRow.getInt("default_time_to_live")); - if (tableRow.has("speculative_retry")) - cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(tableRow.getString("speculative_retry"))); - cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(tableRow.getString("compaction_strategy_class"))); - cfm.compressionParameters(CompressionParameters.fromMap(fromJsonMap(tableRow.getString("compression_parameters")))); - cfm.compactionStrategyOptions(fromJsonMap(tableRow.getString("compaction_strategy_options"))); - - if (tableRow.has("min_index_interval")) - cfm.minIndexInterval(tableRow.getInt("min_index_interval")); - - if (tableRow.has("max_index_interval")) - cfm.maxIndexInterval(tableRow.getInt("max_index_interval")); - - if (tableRow.has("bloom_filter_fp_chance")) - cfm.bloomFilterFpChance(tableRow.getDouble("bloom_filter_fp_chance")); - else - cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance()); - if (tableRow.has("dropped_columns")) addDroppedColumns(cfm, rawComparator, tableRow.getMap("dropped_columns", UTF8Type.instance, LongType.instance)); - cfm.triggers(createTriggersFromTriggerRows(triggerRows)); + return cfm.params(decodeTableParams(tableRow)) + .triggers(createTriggersFromTriggerRows(triggerRows)); + } + + private static TableParams decodeTableParams(UntypedResultSet.Row row) + { + TableParams.Builder params = TableParams.builder(); + + params.readRepairChance(row.getDouble("read_repair_chance")) + .dcLocalReadRepairChance(row.getDouble("local_read_repair_chance")) + .gcGraceSeconds(row.getInt("gc_grace_seconds")); + + if (row.has("comment")) + params.comment(row.getString("comment")); + + if (row.has("memtable_flush_period_in_ms")) + params.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")); + + params.caching(CachingParams.fromMap(fromJsonMap(row.getString("caching")))); + + if (row.has("default_time_to_live")) + params.defaultTimeToLive(row.getInt("default_time_to_live")); + + if (row.has("speculative_retry")) + params.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))); + + params.compression(CompressionParams.fromMap(fromJsonMap(row.getString("compression_parameters")))); + + params.compaction(compactionFromRow(row)); + + if (row.has("min_index_interval")) + params.minIndexInterval(row.getInt("min_index_interval")); + + if (row.has("max_index_interval")) + params.maxIndexInterval(row.getInt("max_index_interval")); + + if (row.has("bloom_filter_fp_chance")) + params.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")); + + return params.build(); + } - return cfm; + /* + * The method is needed - to migrate max_compaction_threshold and min_compaction_threshold + * to the compaction map, where they belong. + * + * We must use reflection to validate the options because not every compaction strategy respects and supports + * the threshold params (LCS doesn't, STCS and DTCS do). + */ + @SuppressWarnings("unchecked") + private static CompactionParams compactionFromRow(UntypedResultSet.Row row) + { + Class<? extends AbstractCompactionStrategy> klass = + CFMetaData.createCompactionStrategy(row.getString("compaction_strategy_class")); + Map<String, String> options = fromJsonMap(row.getString("compaction_strategy_options")); + + int minThreshold = row.getInt("min_compaction_threshold"); + int maxThreshold = row.getInt("max_compaction_threshold"); + + Map<String, String> optionsWithThresholds = new HashMap<>(options); + optionsWithThresholds.putIfAbsent(CompactionParams.Option.MIN_THRESHOLD.toString(), Integer.toString(minThreshold)); + optionsWithThresholds.putIfAbsent(CompactionParams.Option.MAX_THRESHOLD.toString(), Integer.toString(maxThreshold)); + + try + { + Map<String, String> unrecognizedOptions = + (Map<String, String>) klass.getMethod("validateOptions", Map.class).invoke(null, optionsWithThresholds); + + if (unrecognizedOptions.isEmpty()) + options = optionsWithThresholds; + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + return CompactionParams.create(klass, options); } // Should only be called on compact tables @@ -627,10 +671,7 @@ public final class LegacySchemaMigrator SystemKeyspace.NAME, SystemKeyspace.LEGACY_FUNCTIONS); HashMultimap<String, List<String>> functionSignatures = HashMultimap.create(); - query(query, keyspaceName).forEach(row -> - { - functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance)); - }); + query(query, keyspaceName).forEach(row -> functionSignatures.put(row.getString("function_name"), row.getList("signature", UTF8Type.instance))); Collection<Function> functions = new ArrayList<>(); functionSignatures.entries().forEach(pair -> functions.add(readFunction(keyspaceName, pair.getKey(), pair.getValue()))); @@ -699,10 +740,7 @@ public final class LegacySchemaMigrator SystemKeyspace.NAME, SystemKeyspace.LEGACY_AGGREGATES); HashMultimap<String, List<String>> aggregateSignatures = HashMultimap.create(); - query(query, keyspaceName).forEach(row -> - { - aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance)); - }); + query(query, keyspaceName).forEach(row -> aggregateSignatures.put(row.getString("aggregate_name"), row.getList("signature", UTF8Type.instance))); Collection<Aggregate> aggregates = new ArrayList<>(); aggregateSignatures.entries().forEach(pair -> aggregates.add(readAggregate(keyspaceName, pair.getKey(), pair.getValue()))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/ReplicationParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/ReplicationParams.java b/src/java/org/apache/cassandra/schema/ReplicationParams.java new file mode 100644 index 0000000..cdeb4c2 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/ReplicationParams.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.*; +import org.apache.cassandra.service.StorageService; + +public final class ReplicationParams +{ + public static final String CLASS = "class"; + + public final Class<? extends AbstractReplicationStrategy> klass; + public final ImmutableMap<String, String> options; + + private ReplicationParams(Class<? extends AbstractReplicationStrategy> klass, Map<String, String> options) + { + this.klass = klass; + this.options = ImmutableMap.copyOf(options); + } + + static ReplicationParams local() + { + return new ReplicationParams(LocalStrategy.class, ImmutableMap.of()); + } + + static ReplicationParams simple(int replicationFactor) + { + return new ReplicationParams(SimpleStrategy.class, ImmutableMap.of("replication_factor", Integer.toString(replicationFactor))); + } + + public void validate(String name) + { + // Attempt to instantiate the ARS, which will throw a ConfigurationException if the options aren't valid. + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + IEndpointSnitch eps = DatabaseDescriptor.getEndpointSnitch(); + AbstractReplicationStrategy.validateReplicationStrategy(name, klass, tmd, eps, options); + } + + public static ReplicationParams fromMap(Map<String, String> map) + { + Map<String, String> options = new HashMap<>(map); + String className = options.remove(CLASS); + Class<? extends AbstractReplicationStrategy> klass = AbstractReplicationStrategy.getClass(className); + return new ReplicationParams(klass, options); + } + + public Map<String, String> asMap() + { + Map<String, String> map = new HashMap<>(options); + map.put(CLASS, klass.getName()); + return map; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof ReplicationParams)) + return false; + + ReplicationParams r = (ReplicationParams) o; + + return klass.equals(r.klass) && options.equals(r.options); + } + + @Override + public int hashCode() + { + return Objects.hashCode(klass, options); + } + + @Override + public String toString() + { + MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this); + helper.add(CLASS, klass.getName()); + for (Map.Entry<String, String> entry : options.entrySet()) + helper.add(entry.getKey(), entry.getValue()); + return helper.toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index ba6a2e1..5791db7 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -31,21 +31,18 @@ import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.functions.*; -import org.apache.cassandra.cql3.statements.CFPropDefs; +import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -819,20 +816,9 @@ public final class SchemaKeyspace { RowUpdateBuilder adder = new RowUpdateBuilder(Tables, timestamp, mutation).clustering(table.cfName); - adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance()) - .add("comment", table.getComment()) - .add("dclocal_read_repair_chance", table.getDcLocalReadRepairChance()) - .add("default_time_to_live", table.getDefaultTimeToLive()) - .add("gc_grace_seconds", table.getGcGraceSeconds()) - .add("id", table.cfId) - .add("max_index_interval", table.getMaxIndexInterval()) - .add("memtable_flush_period_in_ms", table.getMemtableFlushPeriod()) - .add("min_index_interval", table.getMinIndexInterval()) - .add("read_repair_chance", table.getReadRepairChance()) - .add("speculative_retry", table.getSpeculativeRetry().toString()) - .map("caching", table.getCaching().asMap()) - .map("compaction", buildCompactionMap(table)) - .map("compression", table.compressionParameters().asMap()) + addTableParamsToSchemaMutation(table.params, adder); + + adder.add("id", table.cfId) .set("flags", CFMetaData.flagsToStrings(table.flags())) .build(); @@ -852,38 +838,21 @@ public final class SchemaKeyspace } } - /* - * The method is needed - temporarily - to migrate max_compaction_threshold and min_compaction_threshold - * to the compaction map, where they belong. - * - * We must use reflection to validate the options because not every compaction strategy respects and supports - * the threshold params (LCS doesn't, STCS and DTCS don't). - */ - @SuppressWarnings("unchecked") - private static Map<String, String> buildCompactionMap(CFMetaData cfm) + private static void addTableParamsToSchemaMutation(TableParams params, RowUpdateBuilder adder) { - Map<String, String> options = new HashMap<>(cfm.compactionStrategyOptions); - - Map<String, String> optionsWithThresholds = new HashMap<>(options); - options.putIfAbsent(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, Integer.toString(cfm.getMinCompactionThreshold())); - options.putIfAbsent(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, Integer.toString(cfm.getMaxCompactionThreshold())); - - try - { - Map<String, String> unrecognizedOptions = (Map<String, String>) cfm.compactionStrategyClass - .getMethod("validateOptions", Map.class) - .invoke(null, optionsWithThresholds); - if (unrecognizedOptions.isEmpty()) - options = optionsWithThresholds; - } - catch (Exception e) - { - throw new RuntimeException(e); - } - - options.put("class", cfm.compactionStrategyClass.getName()); - - return options; + adder.add("bloom_filter_fp_chance", params.bloomFilterFpChance) + .add("comment", params.comment) + .add("dclocal_read_repair_chance", params.dcLocalReadRepairChance) + .add("default_time_to_live", params.defaultTimeToLive) + .add("gc_grace_seconds", params.gcGraceSeconds) + .add("max_index_interval", params.maxIndexInterval) + .add("memtable_flush_period_in_ms", params.memtableFlushPeriodInMs) + .add("min_index_interval", params.minIndexInterval) + .add("read_repair_chance", params.readRepairChance) + .add("speculative_retry", params.speculativeRetry.toString()) + .map("caching", params.caching.asMap()) + .map("compaction", params.compaction.asMap()) + .map("compression", params.compression.asMap()); } public static Mutation makeUpdateTableMutation(KeyspaceMetadata keyspace, @@ -1085,49 +1054,38 @@ public final class SchemaKeyspace boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); boolean isDense = flags.contains(CFMetaData.Flag.DENSE); boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); - boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW); - - CFMetaData cfm = CFMetaData.create(keyspace, - table, - id, - isDense, - isCompound, - isSuper, - isCounter, - isMaterializedView, - columns, - DatabaseDescriptor.getPartitioner()); - - Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction")); - Class<? extends AbstractCompactionStrategy> compactionStrategyClass = - CFMetaData.createCompactionStrategy(compaction.remove("class")); - - int minCompactionThreshold = compaction.containsKey(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD) - ? Integer.parseInt(compaction.get(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD)) - : CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD; - - int maxCompactionThreshold = compaction.containsKey(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD) - ? Integer.parseInt(compaction.get(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD)) - : CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD; - - cfm.bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")) - .caching(CachingOptions.fromMap(row.getTextMap("caching"))) - .comment(row.getString("comment")) - .compactionStrategyClass(compactionStrategyClass) - .compactionStrategyOptions(compaction) - .compressionParameters(CompressionParameters.fromMap(row.getTextMap("compression"))) - .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance")) - .defaultTimeToLive(row.getInt("default_time_to_live")) - .gcGraceSeconds(row.getInt("gc_grace_seconds")) - .maxCompactionThreshold(maxCompactionThreshold) - .maxIndexInterval(row.getInt("max_index_interval")) - .memtableFlushPeriod(row.getInt("memtable_flush_period_in_ms")) - .minCompactionThreshold(minCompactionThreshold) - .minIndexInterval(row.getInt("min_index_interval")) - .readRepairChance(row.getDouble("read_repair_chance")) - .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(row.getString("speculative_retry"))); - - return cfm; + boolean isMaterializedView = flags.contains(CFMetaData.Flag.VIEW); + + return CFMetaData.create(keyspace, + table, + id, + isDense, + isCompound, + isSuper, + isCounter, + isMaterializedView, + columns, + DatabaseDescriptor.getPartitioner()) + .params(createTableParamsFromRow(row)); + } + + private static TableParams createTableParamsFromRow(UntypedResultSet.Row row) + { + return TableParams.builder() + .bloomFilterFpChance(row.getDouble("bloom_filter_fp_chance")) + .caching(CachingParams.fromMap(row.getTextMap("caching"))) + .comment(row.getString("comment")) + .compaction(CompactionParams.fromMap(row.getTextMap("compaction"))) + .compression(CompressionParams.fromMap(row.getTextMap("compression"))) + .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance")) + .defaultTimeToLive(row.getInt("default_time_to_live")) + .gcGraceSeconds(row.getInt("gc_grace_seconds")) + .maxIndexInterval(row.getInt("max_index_interval")) + .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")) + .minIndexInterval(row.getInt("min_index_interval")) + .readRepairChance(row.getDouble("read_repair_chance")) + .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))) + .build(); } /* http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java b/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java new file mode 100644 index 0000000..58c6375 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/SpeculativeRetryParam.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.text.DecimalFormat; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Objects; + +import org.apache.cassandra.exceptions.ConfigurationException; + +import static java.lang.String.format; + +public final class SpeculativeRetryParam +{ + public enum Kind + { + NONE, CUSTOM, PERCENTILE, ALWAYS + } + + public static final SpeculativeRetryParam NONE = none(); + public static final SpeculativeRetryParam ALWAYS = always(); + public static final SpeculativeRetryParam DEFAULT = percentile(99); + + private final Kind kind; + private final double value; + + // pre-processed (divided by 100 for PERCENTILE), multiplied by 1M for CUSTOM (to nanos) + private final double threshold; + + private SpeculativeRetryParam(Kind kind, double value) + { + this.kind = kind; + this.value = value; + + if (kind == Kind.PERCENTILE) + threshold = value / 100; + else if (kind == Kind.CUSTOM) + threshold = TimeUnit.MILLISECONDS.toNanos((long) value); + else + threshold = value; + } + + public Kind kind() + { + return kind; + } + + public double threshold() + { + return threshold; + } + + public static SpeculativeRetryParam none() + { + return new SpeculativeRetryParam(Kind.NONE, 0); + } + + public static SpeculativeRetryParam always() + { + return new SpeculativeRetryParam(Kind.ALWAYS, 0); + } + + public static SpeculativeRetryParam custom(double value) + { + return new SpeculativeRetryParam(Kind.CUSTOM, value); + } + + public static SpeculativeRetryParam percentile(double value) + { + return new SpeculativeRetryParam(Kind.PERCENTILE, value); + } + + public static SpeculativeRetryParam fromString(String value) + { + if (value.toLowerCase().endsWith("ms")) + { + try + { + return custom(Double.parseDouble(value.substring(0, value.length() - "ms".length()))); + } + catch (IllegalArgumentException e) + { + throw new ConfigurationException(format("Invalid value %s for option '%s'", value, TableParams.Option.SPECULATIVE_RETRY)); + } + } + + if (value.toUpperCase().endsWith(Kind.PERCENTILE.toString())) + { + double threshold; + try + { + threshold = Double.parseDouble(value.substring(0, value.length() - Kind.PERCENTILE.toString().length())); + } + catch (IllegalArgumentException e) + { + throw new ConfigurationException(format("Invalid value %s for option '%s'", value, TableParams.Option.SPECULATIVE_RETRY)); + } + + if (threshold >= 0.0 && threshold <= 100.0) + return percentile(threshold); + + throw new ConfigurationException(format("Invalid value %s for PERCENTILE option '%s': must be between 0.0 and 100.0", + value, + TableParams.Option.SPECULATIVE_RETRY)); + } + + if (value.equals(Kind.NONE.toString())) + return NONE; + + if (value.equals(Kind.ALWAYS.toString())) + return ALWAYS; + + throw new ConfigurationException(format("Invalid value %s for option '%s'", value, TableParams.Option.SPECULATIVE_RETRY)); + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof SpeculativeRetryParam)) + return false; + SpeculativeRetryParam srp = (SpeculativeRetryParam) o; + return kind == srp.kind && threshold == srp.threshold; + } + + @Override + public int hashCode() + { + return Objects.hashCode(kind, threshold); + } + + @Override + public String toString() + { + switch (kind) + { + case CUSTOM: + return format("%sms", value); + case PERCENTILE: + return format("%sPERCENTILE", new DecimalFormat("#.#####").format(value)); + default: // NONE and ALWAYS + return kind.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/schema/TableParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java new file mode 100644 index 0000000..3b3a88e --- /dev/null +++ b/src/java/org/apache/cassandra/schema/TableParams.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; + +import org.apache.cassandra.exceptions.ConfigurationException; + +import static java.lang.String.format; + +public final class TableParams +{ + public static final TableParams DEFAULT = TableParams.builder().build(); + + public enum Option + { + BLOOM_FILTER_FP_CHANCE, + CACHING, + COMMENT, + COMPACTION, + COMPRESSION, + DCLOCAL_READ_REPAIR_CHANCE, + DEFAULT_TIME_TO_LIVE, + GC_GRACE_SECONDS, + MAX_INDEX_INTERVAL, + MEMTABLE_FLUSH_PERIOD_IN_MS, + MIN_INDEX_INTERVAL, + READ_REPAIR_CHANCE, + SPECULATIVE_RETRY; + + @Override + public String toString() + { + return name().toLowerCase(); + } + } + + public static final String DEFAULT_COMMENT = ""; + public static final double DEFAULT_READ_REPAIR_CHANCE = 0.0; + public static final double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.1; + public static final int DEFAULT_GC_GRACE_SECONDS = 864000; // 10 days + public static final int DEFAULT_DEFAULT_TIME_TO_LIVE = 0; + public static final int DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS = 0; + public static final int DEFAULT_MIN_INDEX_INTERVAL = 128; + public static final int DEFAULT_MAX_INDEX_INTERVAL = 2048; + + public final String comment; + public final double readRepairChance; + public final double dcLocalReadRepairChance; + public final double bloomFilterFpChance; + public final int gcGraceSeconds; + public final int defaultTimeToLive; + public final int memtableFlushPeriodInMs; + public final int minIndexInterval; + public final int maxIndexInterval; + public final SpeculativeRetryParam speculativeRetry; + public final CachingParams caching; + public final CompactionParams compaction; + public final CompressionParams compression; + + private TableParams(Builder builder) + { + comment = builder.comment; + readRepairChance = builder.readRepairChance; + dcLocalReadRepairChance = builder.dcLocalReadRepairChance; + bloomFilterFpChance = builder.bloomFilterFpChance == null + ? builder.compaction.defaultBloomFilterFbChance() + : builder.bloomFilterFpChance; + gcGraceSeconds = builder.gcGraceSeconds; + defaultTimeToLive = builder.defaultTimeToLive; + memtableFlushPeriodInMs = builder.memtableFlushPeriodInMs; + minIndexInterval = builder.minIndexInterval; + maxIndexInterval = builder.maxIndexInterval; + speculativeRetry = builder.speculativeRetry; + caching = builder.caching; + compaction = builder.compaction; + compression = builder.compression; + } + + public static Builder builder() + { + return new Builder(); + } + + public static Builder builder(TableParams params) + { + return new Builder().bloomFilterFpChance(params.bloomFilterFpChance) + .caching(params.caching) + .comment(params.comment) + .compaction(params.compaction) + .compression(params.compression) + .dcLocalReadRepairChance(params.dcLocalReadRepairChance) + .defaultTimeToLive(params.defaultTimeToLive) + .gcGraceSeconds(params.gcGraceSeconds) + .maxIndexInterval(params.maxIndexInterval) + .memtableFlushPeriodInMs(params.memtableFlushPeriodInMs) + .minIndexInterval(params.minIndexInterval) + .readRepairChance(params.readRepairChance) + .speculativeRetry(params.speculativeRetry); + } + + public void validate() + { + compaction.validate(); + compression.validate(); + + if (bloomFilterFpChance <= 0 || bloomFilterFpChance > 1) + { + fail("%s must be larger than 0.0 and less than or equal to 1.0 (got %s)", + Option.BLOOM_FILTER_FP_CHANCE, + bloomFilterFpChance); + } + + if (dcLocalReadRepairChance < 0 || dcLocalReadRepairChance > 1.0) + { + fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)", + Option.DCLOCAL_READ_REPAIR_CHANCE, + dcLocalReadRepairChance); + } + + if (readRepairChance < 0 || readRepairChance > 1.0) + { + fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)", + Option.READ_REPAIR_CHANCE, + readRepairChance); + } + + if (defaultTimeToLive < 0) + fail("%s must be greater than or equal to 0 (got %s)", Option.DEFAULT_TIME_TO_LIVE, defaultTimeToLive); + + if (gcGraceSeconds < 0) + fail("%s must be greater than or equal to 0 (got %s)", Option.GC_GRACE_SECONDS, gcGraceSeconds); + + if (minIndexInterval < 1) + fail("%s must be greater than or equal to 1 (got %s)", Option.MIN_INDEX_INTERVAL, minIndexInterval); + + if (maxIndexInterval < minIndexInterval) + { + fail("%s must be greater than or equal to %s (%s) (got %s)", + Option.MAX_INDEX_INTERVAL, + Option.MIN_INDEX_INTERVAL, + minIndexInterval, + maxIndexInterval); + } + + if (memtableFlushPeriodInMs < 0) + fail("%s must be greater than or equal to 0 (got %s)", Option.MEMTABLE_FLUSH_PERIOD_IN_MS, memtableFlushPeriodInMs); + } + + private static void fail(String format, Object... args) + { + throw new ConfigurationException(format(format, args)); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof TableParams)) + return false; + + TableParams p = (TableParams) o; + + return comment.equals(p.comment) + && readRepairChance == p.readRepairChance + && dcLocalReadRepairChance == p.dcLocalReadRepairChance + && bloomFilterFpChance == p.bloomFilterFpChance + && gcGraceSeconds == p.gcGraceSeconds + && defaultTimeToLive == p.defaultTimeToLive + && memtableFlushPeriodInMs == p.memtableFlushPeriodInMs + && minIndexInterval == p.minIndexInterval + && maxIndexInterval == p.maxIndexInterval + && speculativeRetry.equals(p.speculativeRetry) + && caching.equals(p.caching) + && compaction.equals(p.compaction) + && compression.equals(p.compression); + } + + @Override + public int hashCode() + { + return Objects.hashCode(comment, + readRepairChance, + dcLocalReadRepairChance, + bloomFilterFpChance, + gcGraceSeconds, + defaultTimeToLive, + memtableFlushPeriodInMs, + minIndexInterval, + maxIndexInterval, + speculativeRetry, + caching, + compaction, + compression); + } + + @Override + public String toString() + { + return MoreObjects.toStringHelper(this) + .add(Option.COMMENT.toString(), comment) + .add(Option.READ_REPAIR_CHANCE.toString(), readRepairChance) + .add(Option.DCLOCAL_READ_REPAIR_CHANCE.toString(), dcLocalReadRepairChance) + .add(Option.BLOOM_FILTER_FP_CHANCE.toString(), bloomFilterFpChance) + .add(Option.GC_GRACE_SECONDS.toString(), gcGraceSeconds) + .add(Option.DEFAULT_TIME_TO_LIVE.toString(), defaultTimeToLive) + .add(Option.MEMTABLE_FLUSH_PERIOD_IN_MS.toString(), memtableFlushPeriodInMs) + .add(Option.MIN_INDEX_INTERVAL.toString(), minIndexInterval) + .add(Option.MAX_INDEX_INTERVAL.toString(), maxIndexInterval) + .add(Option.SPECULATIVE_RETRY.toString(), speculativeRetry) + .add(Option.CACHING.toString(), caching) + .add(Option.COMPACTION.toString(), compaction) + .add(Option.COMPRESSION.toString(), compression) + .toString(); + } + + public static final class Builder + { + private String comment = DEFAULT_COMMENT; + private double readRepairChance = DEFAULT_READ_REPAIR_CHANCE; + private double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE; + private Double bloomFilterFpChance; + private int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS; + private int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE; + private int memtableFlushPeriodInMs = DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS; + private int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL; + private int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL; + private SpeculativeRetryParam speculativeRetry = SpeculativeRetryParam.DEFAULT; + private CachingParams caching = CachingParams.DEFAULT; + private CompactionParams compaction = CompactionParams.DEFAULT; + private CompressionParams compression = CompressionParams.DEFAULT; + + public Builder() + { + } + + public TableParams build() + { + return new TableParams(this); + } + + public Builder comment(String val) + { + comment = val; + return this; + } + + public Builder readRepairChance(double val) + { + readRepairChance = val; + return this; + } + + public Builder dcLocalReadRepairChance(double val) + { + dcLocalReadRepairChance = val; + return this; + } + + public Builder bloomFilterFpChance(double val) + { + bloomFilterFpChance = val; + return this; + } + + public Builder gcGraceSeconds(int val) + { + gcGraceSeconds = val; + return this; + } + + public Builder defaultTimeToLive(int val) + { + defaultTimeToLive = val; + return this; + } + + public Builder memtableFlushPeriodInMs(int val) + { + memtableFlushPeriodInMs = val; + return this; + } + + public Builder minIndexInterval(int val) + { + minIndexInterval = val; + return this; + } + + public Builder maxIndexInterval(int val) + { + maxIndexInterval = val; + return this; + } + + public Builder speculativeRetry(SpeculativeRetryParam val) + { + speculativeRetry = val; + return this; + } + + public Builder caching(CachingParams val) + { + caching = val; + return this; + } + + public Builder compaction(CompactionParams val) + { + compaction = val; + return this; + } + + public Builder compression(CompressionParams val) + { + compression = val; + return this; + } + } +}
