Repository: cassandra Updated Branches: refs/heads/trunk e1e692a75 -> e06d411f6
Make LZ4 compression level configurable Patch by mkjellman; reviewed by marcuse for CASSANDRA-11051 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e06d411f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e06d411f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e06d411f Branch: refs/heads/trunk Commit: e06d411f6a6a459503f09fe80208d1ee261ed772 Parents: e1e692a Author: Michael Kjellman <[email protected]> Authored: Fri Feb 19 09:47:44 2016 +0100 Committer: Marcus Eriksson <[email protected]> Committed: Fri Apr 8 08:09:36 2016 +0200 ---------------------------------------------------------------------- .../cassandra/io/compress/LZ4Compressor.java | 106 +++++++++++++++++-- .../cassandra/schema/CompressionParams.java | 6 +- .../io/compress/CompressorPerformance.java | 3 +- .../db/commitlog/SegmentReaderTest.java | 3 +- .../io/compress/CQLCompressionTest.java | 56 ++++++++++ .../CompressedSequentialWriterTest.java | 7 +- 6 files changed, 166 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java index 3a3b024..3fd889e 100644 --- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java +++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java @@ -23,31 +23,80 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import net.jpountz.lz4.LZ4Exception; import net.jpountz.lz4.LZ4Factory; -import org.apache.cassandra.schema.CompressionParams; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.utils.Pair; public class LZ4Compressor implements ICompressor { + private static final Logger logger = LoggerFactory.getLogger(LZ4Compressor.class); + + public static final String LZ4_FAST_COMPRESSOR = "fast"; + public static final String LZ4_HIGH_COMPRESSOR = "high"; + private static final Set<String> VALID_COMPRESSOR_TYPES = new HashSet<>(Arrays.asList(LZ4_FAST_COMPRESSOR, LZ4_HIGH_COMPRESSOR)); + + private static final int DEFAULT_HIGH_COMPRESSION_LEVEL = 9; + private static final String DEFAULT_LZ4_COMPRESSOR_TYPE = LZ4_FAST_COMPRESSOR; + + public static final String LZ4_HIGH_COMPRESSION_LEVEL = "lz4_high_compressor_level"; + public static final String LZ4_COMPRESSOR_TYPE = "lz4_compressor_type"; + private static final int INTEGER_BYTES = 4; - @VisibleForTesting - public static final LZ4Compressor instance = new LZ4Compressor(); + private static final ConcurrentHashMap<Pair<String, Integer>, LZ4Compressor> instances = new ConcurrentHashMap<>(); - public static LZ4Compressor create(Map<String, String> args) + public static LZ4Compressor create(Map<String, String> args) throws ConfigurationException { + String compressorType = validateCompressorType(args.get(LZ4_COMPRESSOR_TYPE)); + Integer compressionLevel = validateCompressionLevel(args.get(LZ4_HIGH_COMPRESSION_LEVEL)); + + Pair<String, Integer> compressorTypeAndLevel = Pair.create(compressorType, compressionLevel); + LZ4Compressor instance = instances.get(compressorTypeAndLevel); + if (instance == null) + { + if (compressorType.equals(LZ4_FAST_COMPRESSOR) && args.get(LZ4_HIGH_COMPRESSION_LEVEL) != null) + logger.warn("'{}' parameter is ignored when '{}' is '{}'", LZ4_HIGH_COMPRESSION_LEVEL, LZ4_COMPRESSOR_TYPE, LZ4_FAST_COMPRESSOR); + instance = new LZ4Compressor(compressorType, compressionLevel); + LZ4Compressor instanceFromMap = instances.putIfAbsent(compressorTypeAndLevel, instance); + if(instanceFromMap != null) + instance = instanceFromMap; + } return instance; } private final net.jpountz.lz4.LZ4Compressor compressor; private final net.jpountz.lz4.LZ4FastDecompressor decompressor; + @VisibleForTesting + final String compressorType; + @VisibleForTesting + final Integer compressionLevel; - private LZ4Compressor() + private LZ4Compressor(String type, Integer compressionLevel) { + this.compressorType = type; + this.compressionLevel = compressionLevel; final LZ4Factory lz4Factory = LZ4Factory.fastestInstance(); - compressor = lz4Factory.fastCompressor(); + switch (type) + { + case LZ4_HIGH_COMPRESSOR: + { + compressor = lz4Factory.highCompressor(compressionLevel); + break; + } + case LZ4_FAST_COMPRESSOR: + default: + { + compressor = lz4Factory.fastCompressor(); + } + } + decompressor = lz4Factory.fastDecompressor(); } @@ -127,7 +176,50 @@ public class LZ4Compressor implements ICompressor public Set<String> supportedOptions() { - return new HashSet<>(); + return new HashSet<>(Arrays.asList(LZ4_HIGH_COMPRESSION_LEVEL, LZ4_COMPRESSOR_TYPE)); + } + + public static String validateCompressorType(String compressorType) throws ConfigurationException + { + if (compressorType == null) + return DEFAULT_LZ4_COMPRESSOR_TYPE; + + if (!VALID_COMPRESSOR_TYPES.contains(compressorType)) + { + throw new ConfigurationException(String.format("Invalid compressor type '%s' specified for LZ4 parameter '%s'. " + + "Valid options are %s.", compressorType, LZ4_COMPRESSOR_TYPE, + VALID_COMPRESSOR_TYPES.toString())); + } + else + { + return compressorType; + } + } + + public static Integer validateCompressionLevel(String compressionLevel) throws ConfigurationException + { + if (compressionLevel == null) + return DEFAULT_HIGH_COMPRESSION_LEVEL; + + ConfigurationException ex = new ConfigurationException("Invalid value [" + compressionLevel + "] for parameter '" + + LZ4_HIGH_COMPRESSION_LEVEL + "'. Value must be between 1 and 17."); + + Integer level; + try + { + level = Integer.parseInt(compressionLevel); + } + catch (NumberFormatException e) + { + throw ex; + } + + if (level < 1 || level > 17) + { + throw ex; + } + + return level; } public BufferType preferredBufferType() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/src/java/org/apache/cassandra/schema/CompressionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index cd1686f..bd10f75 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -56,7 +56,7 @@ public final class CompressionParams public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb"; public static final String ENABLED = "enabled"; - public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.instance, + public static final CompressionParams DEFAULT = new CompressionParams(LZ4Compressor.create(Collections.<String, String>emptyMap()), DEFAULT_CHUNK_LENGTH, Collections.emptyMap()); @@ -139,7 +139,7 @@ public final class CompressionParams public static CompressionParams lz4(Integer chunkLength) { - return new CompressionParams(LZ4Compressor.instance, chunkLength, Collections.emptyMap()); + return new CompressionParams(LZ4Compressor.create(Collections.emptyMap()), chunkLength, Collections.emptyMap()); } public CompressionParams(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException @@ -272,7 +272,7 @@ public final class CompressionParams private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co) { if (co == null || co.isEmpty()) - return Collections.<String, String>emptyMap(); + return Collections.emptyMap(); Map<String, String> compressionOptions = new HashMap<>(); for (Map.Entry<? extends CharSequence, ? extends CharSequence> entry : co.entrySet()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java index 5b23bea..db190bc 100644 --- a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java +++ b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java @@ -3,6 +3,7 @@ package org.apache.cassandra.io.compress; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; public class CompressorPerformance @@ -13,7 +14,7 @@ public class CompressorPerformance for (ICompressor compressor: new ICompressor[] { SnappyCompressor.instance, // warm up DeflateCompressor.instance, - LZ4Compressor.instance, + LZ4Compressor.create(Collections.emptyMap()), SnappyCompressor.instance }) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java index 04e471d..3ec0db2 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/SegmentReaderTest.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.Collections; import java.util.Random; import javax.crypto.Cipher; @@ -52,7 +53,7 @@ public class SegmentReaderTest @Test public void compressedSegmenter_LZ4() throws IOException { - compressedSegmenter(LZ4Compressor.create(null)); + compressedSegmenter(LZ4Compressor.create(Collections.emptyMap())); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java new file mode 100644 index 0000000..a2aff2f --- /dev/null +++ b/test/unit/org/apache/cassandra/io/compress/CQLCompressionTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.exceptions.ConfigurationException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class CQLCompressionTest extends CQLTester +{ + @Test + public void lz4ParamsTest() + { + createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor', 'lz4_high_compressor_level':3}"); + assertTrue(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType.equals(LZ4Compressor.LZ4_FAST_COMPRESSOR)); + createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor', 'lz4_compressor_type':'high', 'lz4_high_compressor_level':13}"); + assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType, LZ4Compressor.LZ4_HIGH_COMPRESSOR); + assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressionLevel, (Integer)13); + createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor'}"); + assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressorType, LZ4Compressor.LZ4_FAST_COMPRESSOR); + assertEquals(((LZ4Compressor)getCurrentColumnFamilyStore().metadata.params.compression.getSstableCompressor()).compressionLevel, (Integer)9); + } + + @Test(expected = ConfigurationException.class) + public void lz4BadParamsTest() throws Throwable + { + try + { + createTable("create table %s (id int primary key, uh text) with compression = {'class':'LZ4Compressor', 'lz4_compressor_type':'high', 'lz4_high_compressor_level':113}"); + } + catch (RuntimeException e) + { + throw e.getCause(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e06d411f/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index 9b09f0b..66a0e28 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -182,6 +182,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest offsetsFile.getPath(), CompressionParams.lz4(BUFFER_SIZE), new MetadataCollector(new ClusteringComparator(UTF8Type.instance)))); + } private TestableCSW(File file, File offsetsFile, CompressedSequentialWriter sw) throws IOException @@ -196,7 +197,7 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest Assert.assertFalse(offsetsFile.exists()); byte[] compressed = readFileToByteArray(file); byte[] uncompressed = new byte[partialContents.length]; - LZ4Compressor.instance.uncompress(compressed, 0, compressed.length - 4, uncompressed, 0); + LZ4Compressor.create(Collections.<String, String>emptyMap()).uncompress(compressed, 0, compressed.length - 4, uncompressed, 0); Assert.assertTrue(Arrays.equals(partialContents, uncompressed)); } @@ -214,8 +215,8 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest int offset = (int) offsets.readLong(); byte[] compressed = readFileToByteArray(file); byte[] uncompressed = new byte[fullContents.length]; - LZ4Compressor.instance.uncompress(compressed, 0, offset - 4, uncompressed, 0); - LZ4Compressor.instance.uncompress(compressed, offset, compressed.length - (4 + offset), uncompressed, partialContents.length); + LZ4Compressor.create(Collections.<String, String>emptyMap()).uncompress(compressed, 0, offset - 4, uncompressed, 0); + LZ4Compressor.create(Collections.<String, String>emptyMap()).uncompress(compressed, offset, compressed.length - (4 + offset), uncompressed, partialContents.length); Assert.assertTrue(Arrays.equals(fullContents, uncompressed)); }
