Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 8bafc180b -> c20566fa6 refs/heads/trunk 02c92dfce -> bb25f5bdd
Implement hints compression Patch by bdeggleston; reviewed by jmckenzie for CASSANDRA-9428 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c20566fa Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c20566fa Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c20566fa Branch: refs/heads/cassandra-3.0 Commit: c20566fa64031dd30a1f731eee1394264977eb6f Parents: 8bafc18 Author: Blake Eggleston <[email protected]> Authored: Mon Dec 14 16:09:55 2015 -0800 Committer: Joshua McKenzie <[email protected]> Committed: Thu Dec 24 07:16:07 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 3 +- NEWS.txt | 9 ++ conf/cassandra.yaml | 8 + .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../cassandra/config/ParameterizedClass.java | 9 +- .../cassandra/hints/ChecksummedDataInput.java | 15 +- .../hints/CompressedChecksummedDataInput.java | 158 +++++++++++++++++++ .../cassandra/hints/CompressedHintsWriter.java | 67 ++++++++ .../apache/cassandra/hints/HintsCatalog.java | 19 ++- .../apache/cassandra/hints/HintsDescriptor.java | 37 +++++ .../org/apache/cassandra/hints/HintsReader.java | 22 ++- .../apache/cassandra/hints/HintsService.java | 32 +++- .../org/apache/cassandra/hints/HintsStore.java | 11 +- .../cassandra/hints/HintsWriteExecutor.java | 2 +- .../org/apache/cassandra/hints/HintsWriter.java | 60 +++++-- .../cassandra/hints/HintsCatalogTest.java | 2 +- .../cassandra/hints/HintsCompressionTest.java | 157 ++++++++++++++++++ .../hints/LegacyHintsMigratorTest.java | 4 +- 19 files changed, 582 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a669b17..db286b9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 3.0.3 - * Fix potential assertion error when reading static columns (CASSANDRA-0903) + * Implement hints compression (CASSANDRA-9428) + * Fix potential assertion error when reading static columns (CASSANDRA-10903) * Avoid NoSuchElementException when executing empty batch (CASSANDRA-10711) * Avoid building PartitionUpdate in toString (CASSANDRA-10897) * Reduce heap spent when receiving many SSTables (CASSANDRA-10797) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index b4f1eaf..8a03e14 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +3.0.3 +===== + +New features +------------ + - Hinted handoff now supports compression. Reference cassandra.yaml:hints_compression. + Note: hints compression is currently disabled by default. + + 3.0.1 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 21fb22d..74e1d1d 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -77,6 +77,14 @@ hints_flush_period_in_ms: 10000 # Maximum size for a single hints file, in megabytes. max_hints_file_size_in_mb: 128 +# Compression to apply to the hint files. If omitted, hints files +# will be written uncompressed. LZ4, Snappy, and Deflate compressors +# are supported. +#hints_compression: +# - class_name: LZ4Compressor +# parameters: +# - + # Maximum throttle in KBs per second, total. This will be # reduced proportionally to the number of nodes in the cluster. batchlog_replay_throttle_in_kb: 1024 http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index b1b0dff..7154ba3 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -202,6 +202,7 @@ public class Config public int max_hints_delivery_threads = 2; public int hints_flush_period_in_ms = 10000; public int max_hints_file_size_in_mb = 128; + public ParameterizedClass hints_compression; public int sstable_preemptive_open_interval_in_mb = 50; public volatile boolean incremental_backups = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index fc77977..c903775 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1623,6 +1623,16 @@ public class DatabaseDescriptor return conf.max_hints_file_size_in_mb * 1024L * 1024L; } + public static ParameterizedClass getHintsCompression() + { + return conf.hints_compression; + } + + public static void setHintsCompression(ParameterizedClass parameterizedClass) + { + conf.hints_compression = parameterizedClass; + } + public static boolean isIncrementalBackupsEnabled() { return conf.incremental_backups; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/config/ParameterizedClass.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/ParameterizedClass.java b/src/java/org/apache/cassandra/config/ParameterizedClass.java index c7614de..6c7996a 100644 --- a/src/java/org/apache/cassandra/config/ParameterizedClass.java +++ b/src/java/org/apache/cassandra/config/ParameterizedClass.java @@ -17,14 +17,17 @@ */ package org.apache.cassandra.config; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import com.google.common.base.Objects; +import com.google.common.collect.ImmutableMap; public class ParameterizedClass { + public static final String CLASS_NAME = "class_name"; + public static final String PARAMETERS = "parameters"; + public String class_name; public Map<String, String> parameters; @@ -37,8 +40,8 @@ public class ParameterizedClass @SuppressWarnings("unchecked") public ParameterizedClass(Map<String, ?> p) { - this((String)p.get("class_name"), - p.containsKey("parameters") ? (Map<String, String>)((List<?>)p.get("parameters")).get(0) : null); + this((String)p.get(CLASS_NAME), + p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java index d5b8ae0..1dc6d1e 100644 --- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java @@ -22,9 +22,13 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.zip.CRC32; +import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.schema.CompressionParams; /** * A {@link RandomAccessReader} wrapper that calctulates the CRC in place. @@ -37,7 +41,7 @@ import org.apache.cassandra.io.util.RandomAccessReader; * corrupted sequence by reading a huge corrupted length of bytes via * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}. */ -public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel +public class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel { private final CRC32 crc; private int crcPosition; @@ -46,7 +50,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR private long limit; private FileMark limitMark; - private ChecksummedDataInput(Builder builder) + protected ChecksummedDataInput(Builder builder) { super(builder); @@ -63,6 +67,11 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR return new Builder(new ChannelProxy(file)).build(); } + protected void releaseBuffer() + { + super.releaseBuffer(); + } + public void resetCrc() { crc.reset(); @@ -150,7 +159,7 @@ public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessR crc.update(unprocessed); } - public final static class Builder extends RandomAccessReader.Builder + public static class Builder extends RandomAccessReader.Builder { public Builder(ChannelProxy channel) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java new file mode 100644 index 0000000..1009b57 --- /dev/null +++ b/src/java/org/apache/cassandra/hints/CompressedChecksummedDataInput.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.utils.memory.BufferPool; + +public final class CompressedChecksummedDataInput extends ChecksummedDataInput +{ + private final ICompressor compressor; + private volatile long filePosition = 0; + private volatile ByteBuffer compressedBuffer = null; + private final ByteBuffer metadataBuffer = ByteBuffer.allocate(CompressedHintsWriter.METADATA_SIZE); + + public CompressedChecksummedDataInput(Builder builder) + { + super(builder); + assert regions == null; //mmapped regions are not supported + + compressor = builder.compressor; + filePosition = builder.position; + } + + /** + * Since an entire block of compressed data is read off of disk, not just a hint at a time, + * we don't report EOF until the decompressed data has also been read completely + */ + public boolean isEOF() + { + return filePosition == channel.size() && buffer.remaining() == 0; + } + + protected void reBufferStandard() + { + metadataBuffer.clear(); + channel.read(metadataBuffer, filePosition); + filePosition += CompressedHintsWriter.METADATA_SIZE; + metadataBuffer.rewind(); + + int uncompressedSize = metadataBuffer.getInt(); + int compressedSize = metadataBuffer.getInt(); + + if (compressedBuffer == null || compressedSize > compressedBuffer.capacity()) + { + int bufferSize = compressedSize + (compressedSize / 20); // allocate +5% to cover variability in compressed size + if (compressedBuffer != null) + { + BufferPool.put(compressedBuffer); + } + compressedBuffer = allocateBuffer(bufferSize, compressor.preferredBufferType()); + } + + compressedBuffer.clear(); + compressedBuffer.limit(compressedSize); + channel.read(compressedBuffer, filePosition); + compressedBuffer.rewind(); + filePosition += compressedSize; + + bufferOffset += buffer.position(); + if (buffer.capacity() < uncompressedSize) + { + int bufferSize = uncompressedSize + (uncompressedSize / 20); + BufferPool.put(buffer); + buffer = allocateBuffer(bufferSize, compressor.preferredBufferType()); + } + + buffer.clear(); + buffer.limit(uncompressedSize); + try + { + compressor.uncompress(compressedBuffer, buffer); + buffer.flip(); + } + catch (IOException e) + { + throw new FSReadError(e, getPath()); + } + } + + protected void releaseBuffer() + { + super.releaseBuffer(); + if (compressedBuffer != null) + { + BufferPool.put(compressedBuffer); + compressedBuffer = null; + } + } + + protected void reBufferMmap() + { + throw new UnsupportedOperationException(); + } + + public static final class Builder extends ChecksummedDataInput.Builder + { + private long position; + private ICompressor compressor; + + public Builder(ChannelProxy channel) + { + super(channel); + bufferType = null; + } + + public CompressedChecksummedDataInput build() + { + assert position >= 0; + assert compressor != null; + return new CompressedChecksummedDataInput(this); + } + + public Builder withCompressor(ICompressor compressor) + { + this.compressor = compressor; + bufferType = compressor.preferredBufferType(); + return this; + } + + public Builder withPosition(long position) + { + this.position = position; + return this; + } + } + + public static final CompressedChecksummedDataInput upgradeInput(ChecksummedDataInput input, ICompressor compressor) + { + long position = input.getPosition(); + input.close(); + + Builder builder = new Builder(new ChannelProxy(input.getPath())); + builder.withPosition(position); + builder.withCompressor(compressor); + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java new file mode 100644 index 0000000..491dceb --- /dev/null +++ b/src/java/org/apache/cassandra/hints/CompressedHintsWriter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.zip.CRC32; + +import org.apache.cassandra.io.compress.ICompressor; + +public class CompressedHintsWriter extends HintsWriter +{ + // compressed and uncompressed size is stored at the beginning of each compressed block + static final int METADATA_SIZE = 8; + + private final ICompressor compressor; + + private volatile ByteBuffer compressionBuffer = null; + + public CompressedHintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC) + { + super(directory, descriptor, file, channel, fd, globalCRC); + compressor = descriptor.createCompressor(); + assert compressor != null; + } + + protected void writeBuffer(ByteBuffer bb) throws IOException + { + int originalSize = bb.remaining(); + int estimatedSize = compressor.initialCompressedBufferLength(originalSize) + METADATA_SIZE; + + if (compressionBuffer == null || compressionBuffer.capacity() < estimatedSize) + { + compressionBuffer = compressor.preferredBufferType().allocate(estimatedSize); + } + compressionBuffer.clear(); + + compressionBuffer.position(METADATA_SIZE); + compressor.compress(bb, compressionBuffer); + int compressedSize = compressionBuffer.position() - METADATA_SIZE; + + compressionBuffer.rewind(); + compressionBuffer.putInt(originalSize); + compressionBuffer.putInt(compressedSize); + compressionBuffer.rewind(); + compressionBuffer.limit(compressedSize + METADATA_SIZE); + super.writeBuffer(compressionBuffer); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsCatalog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java index cb8e1fd..c2f0972 100644 --- a/src/java/org/apache/cassandra/hints/HintsCatalog.java +++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java @@ -24,6 +24,8 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; +import com.google.common.collect.ImmutableMap; + import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.SyncUtil; @@ -37,20 +39,22 @@ final class HintsCatalog { private final File hintsDirectory; private final Map<UUID, HintsStore> stores; + private final ImmutableMap<String, Object> writerParams; - private HintsCatalog(File hintsDirectory, Map<UUID, List<HintsDescriptor>> descriptors) + private HintsCatalog(File hintsDirectory, ImmutableMap<String, Object> writerParams, Map<UUID, List<HintsDescriptor>> descriptors) { this.hintsDirectory = hintsDirectory; + this.writerParams = writerParams; this.stores = new ConcurrentHashMap<>(); for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet()) - stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, entry.getValue())); + stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, writerParams, entry.getValue())); } /** * Loads hints stores from a given directory. */ - static HintsCatalog load(File hintsDirectory) + static HintsCatalog load(File hintsDirectory, ImmutableMap<String, Object> writerParams) { try { @@ -59,7 +63,7 @@ final class HintsCatalog .filter(HintsDescriptor::isHintFileName) .map(HintsDescriptor::readFromFile) .collect(groupingBy(h -> h.hostId)); - return new HintsCatalog(hintsDirectory, stores); + return new HintsCatalog(hintsDirectory, writerParams, stores); } catch (IOException e) { @@ -84,7 +88,7 @@ final class HintsCatalog // and in this case would also allocate for the capturing lambda; the method is on a really hot path HintsStore store = stores.get(hostId); return store == null - ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, Collections.emptyList())) + ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, writerParams, Collections.emptyList())) : store; } @@ -133,4 +137,9 @@ final class HintsCatalog CLibrary.tryCloseFD(fd); } } + + ImmutableMap<String, Object> getWriterParams() + { + return writerParams; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java index 9c27a23..f5296b3 100644 --- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java +++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java @@ -31,10 +31,13 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.CompressionParams; import org.json.simple.JSONValue; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; @@ -50,6 +53,8 @@ final class HintsDescriptor static final int VERSION_30 = 1; static final int CURRENT_VERSION = VERSION_30; + static final String COMPRESSION = "compression"; + static final Pattern pattern = Pattern.compile("^[a-fA-F0-9]{8}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{4}\\-[a-fA-F0-9]{12}\\-(\\d+)\\-(\\d+)\\.hints$"); @@ -59,6 +64,7 @@ final class HintsDescriptor // implemented for future compression support - see CASSANDRA-9428 final ImmutableMap<String, Object> parameters; + final ParameterizedClass compressionConfig; HintsDescriptor(UUID hostId, int version, long timestamp, ImmutableMap<String, Object> parameters) { @@ -66,6 +72,12 @@ final class HintsDescriptor this.version = version; this.timestamp = timestamp; this.parameters = parameters; + compressionConfig = createCompressionConfig(parameters); + } + + HintsDescriptor(UUID hostId, long timestamp, ImmutableMap<String, Object> parameters) + { + this(hostId, CURRENT_VERSION, timestamp, parameters); } HintsDescriptor(UUID hostId, long timestamp) @@ -73,6 +85,21 @@ final class HintsDescriptor this(hostId, CURRENT_VERSION, timestamp, ImmutableMap.<String, Object>of()); } + @SuppressWarnings("unchecked") + static ParameterizedClass createCompressionConfig(Map<String, Object> params) + { + if (params.containsKey(COMPRESSION)) + { + Map<String, Object> compressorConfig = (Map<String, Object>) params.get(COMPRESSION); + return new ParameterizedClass((String) compressorConfig.get(ParameterizedClass.CLASS_NAME), + (Map<String, String>) compressorConfig.get(ParameterizedClass.PARAMETERS)); + } + else + { + return null; + } + } + String fileName() { return String.format("%s-%s-%s.hints", hostId, timestamp, version); @@ -116,6 +143,16 @@ final class HintsDescriptor } } + public boolean isCompressed() + { + return compressionConfig != null; + } + + public ICompressor createCompressor() + { + return isCompressed() ? CompressionParams.createCompressor(compressionConfig) : null; + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java index 67bb4f6..fe2b57a 100644 --- a/src/java/org/apache/cassandra/hints/HintsReader.java +++ b/src/java/org/apache/cassandra/hints/HintsReader.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.io.FSReadError; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.CLibrary; @@ -48,7 +47,7 @@ import org.apache.cassandra.utils.CLibrary; * The latter is required for dispatch of hints to nodes that have a different messaging version, and in general is just an * easy way to enable backward and future compatibilty. */ -final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> +class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> { private static final Logger logger = LoggerFactory.getLogger(HintsReader.class); @@ -63,7 +62,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> @Nullable private final RateLimiter rateLimiter; - private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter) + protected HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter) { this.descriptor = descriptor; this.file = file; @@ -78,6 +77,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> try { HintsDescriptor descriptor = HintsDescriptor.deserialize(reader); + if (descriptor.isCompressed()) + { + // since the hints descriptor is always uncompressed, it needs to be read with the normal ChecksummedDataInput. + // The compressed input is instantiated with the uncompressed input's position + reader = CompressedChecksummedDataInput.upgradeInput(reader, descriptor.createCompressor()); + } return new HintsReader(descriptor, file, reader, rateLimiter); } catch (IOException e) @@ -112,6 +117,11 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> return new PagesIterator(); } + public ChecksummedDataInput getInput() + { + return input; + } + final class Page { public final long offset; @@ -139,7 +149,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> { CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath()); - if (input.length() == input.getFilePointer()) + if (input.isEOF()) return endOfData(); return new Page(input.getFilePointer()); @@ -167,7 +177,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> { long position = input.getFilePointer(); - if (input.length() == position) + if (input.isEOF()) return endOfData(); // reached EOF if (position - offset >= PAGE_SIZE) @@ -257,7 +267,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page> { long position = input.getFilePointer(); - if (input.length() == position) + if (input.isEOF()) return endOfData(); // reached EOF if (position - offset >= PAGE_SIZE) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 6aed07f..5001af4 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; +import java.util.Map; import java.util.UUID; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -30,13 +31,16 @@ import java.util.function.Supplier; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.metrics.HintedHandoffMetrics; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.service.StorageService; import static com.google.common.collect.Iterables.transform; @@ -60,6 +64,7 @@ public final class HintsService implements HintsServiceMBean private static final String MBEAN_NAME = "org.apache.cassandra.hints:type=HintsService"; private static final int MIN_BUFFER_SIZE = 32 << 20; + static final ImmutableMap<String, Object> EMPTY_PARAMS = ImmutableMap.of(); private final HintsCatalog catalog; private final HintsWriteExecutor writeExecutor; @@ -79,7 +84,7 @@ public final class HintsService implements HintsServiceMBean File hintsDirectory = DatabaseDescriptor.getHintsDirectory(); int maxDeliveryThreads = DatabaseDescriptor.getMaxHintsDeliveryThreads(); - catalog = HintsCatalog.load(hintsDirectory); + catalog = HintsCatalog.load(hintsDirectory, createDescriptorParams()); writeExecutor = new HintsWriteExecutor(catalog); int bufferSize = Math.max(DatabaseDescriptor.getMaxMutationSize() * 2, MIN_BUFFER_SIZE); @@ -97,6 +102,26 @@ public final class HintsService implements HintsServiceMBean metrics = new HintedHandoffMetrics(); } + private static ImmutableMap<String, Object> createDescriptorParams() + { + ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); + + ParameterizedClass compressionConfig = DatabaseDescriptor.getHintsCompression(); + if (compressionConfig != null) + { + ImmutableMap.Builder<String, Object> compressorParams = ImmutableMap.builder(); + + compressorParams.put(ParameterizedClass.CLASS_NAME, compressionConfig.class_name); + if (compressionConfig.parameters != null) + { + compressorParams.put(ParameterizedClass.PARAMETERS, compressionConfig.parameters); + } + builder.put(HintsDescriptor.COMPRESSION, compressorParams.build()); + } + + return builder.build(); + } + public void registerMBean() { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -323,4 +348,9 @@ public final class HintsService implements HintsServiceMBean return dispatchExecutor.transfer(catalog, hostIdSupplier); } + + HintsCatalog getCatalog() + { + return catalog; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java index e19de99..0fe582f 100644 --- a/src/java/org/apache/cassandra/hints/HintsStore.java +++ b/src/java/org/apache/cassandra/hints/HintsStore.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; +import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ final class HintsStore public final UUID hostId; private final File hintsDirectory; + private final ImmutableMap<String, Object> writerParams; private final Map<HintsDescriptor, Long> dispatchOffsets; private final Deque<HintsDescriptor> dispatchDequeue; @@ -55,10 +57,11 @@ final class HintsStore private volatile long lastUsedTimestamp; private volatile HintsWriter hintsWriter; - private HintsStore(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors) + private HintsStore(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors) { this.hostId = hostId; this.hintsDirectory = hintsDirectory; + this.writerParams = writerParams; dispatchOffsets = new ConcurrentHashMap<>(); dispatchDequeue = new ConcurrentLinkedDeque<>(descriptors); @@ -68,10 +71,10 @@ final class HintsStore lastUsedTimestamp = descriptors.stream().mapToLong(d -> d.timestamp).max().orElse(0L); } - static HintsStore create(UUID hostId, File hintsDirectory, List<HintsDescriptor> descriptors) + static HintsStore create(UUID hostId, File hintsDirectory, ImmutableMap<String, Object> writerParams, List<HintsDescriptor> descriptors) { descriptors.sort((d1, d2) -> Long.compare(d1.timestamp, d2.timestamp)); - return new HintsStore(hostId, hintsDirectory, descriptors); + return new HintsStore(hostId, hintsDirectory, writerParams, descriptors); } InetAddress address() @@ -179,7 +182,7 @@ final class HintsStore private HintsWriter openWriter() { lastUsedTimestamp = Math.max(System.currentTimeMillis(), lastUsedTimestamp + 1); - HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp); + HintsDescriptor descriptor = new HintsDescriptor(hostId, lastUsedTimestamp, writerParams); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java index 932f1c7..098573a 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java +++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java @@ -35,7 +35,7 @@ import org.apache.cassandra.io.FSWriteError; */ final class HintsWriteExecutor { - private static final int WRITE_BUFFER_SIZE = 256 << 10; + static final int WRITE_BUFFER_SIZE = 256 << 10; private final HintsCatalog catalog; private final ByteBuffer writeBuffer; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/src/java/org/apache/cassandra/hints/HintsWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsWriter.java b/src/java/org/apache/cassandra/hints/HintsWriter.java index 64520b9..8836258 100644 --- a/src/java/org/apache/cassandra/hints/HintsWriter.java +++ b/src/java/org/apache/cassandra/hints/HintsWriter.java @@ -27,10 +27,13 @@ import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.zip.CRC32; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputBufferFixed; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.CLibrary; import org.apache.cassandra.utils.SyncUtil; import org.apache.cassandra.utils.Throwables; @@ -39,7 +42,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksum; import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt; import static org.apache.cassandra.utils.Throwables.perform; -final class HintsWriter implements AutoCloseable +class HintsWriter implements AutoCloseable { static final int PAGE_SIZE = 4096; @@ -52,7 +55,7 @@ final class HintsWriter implements AutoCloseable private volatile long lastSyncPosition = 0L; - private HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC) + protected HintsWriter(File directory, HintsDescriptor descriptor, File file, FileChannel channel, int fd, CRC32 globalCRC) { this.directory = directory; this.descriptor = descriptor; @@ -86,7 +89,14 @@ final class HintsWriter implements AutoCloseable throw e; } - return new HintsWriter(directory, descriptor, file, channel, fd, crc); + if (descriptor.isCompressed()) + { + return new CompressedHintsWriter(directory, descriptor, file, channel, fd, crc); + } + else + { + return new HintsWriter(directory, descriptor, file, channel, fd, crc); + } } HintsDescriptor descriptor() @@ -138,6 +148,15 @@ final class HintsWriter implements AutoCloseable } /** + * Writes byte buffer into the file channel. Buffer should be flipped before calling this + */ + protected void writeBuffer(ByteBuffer bb) throws IOException + { + updateChecksum(globalCRC, bb); + channel.write(bb); + } + + /** * The primary goal of the Session class is to be able to share the same buffers among potentially dozens or hundreds * of hints writers, and ensure that their contents are always written to the underlying channels in the end. */ @@ -157,6 +176,12 @@ final class HintsWriter implements AutoCloseable this.initialSize = initialSize; } + @VisibleForTesting + long getBytesWritten() + { + return bytesWritten; + } + long position() { return initialSize + bytesWritten; @@ -173,22 +198,24 @@ final class HintsWriter implements AutoCloseable { bytesWritten += hint.remaining(); - // if the hint fits in the aggregation buffer, then just update the aggregation buffer, - // otherwise write both the aggregation buffer and the new buffer to the channel + // if the hint to write won't fit in the aggregation buffer, flush it + if (hint.remaining() > buffer.remaining()) + { + buffer.flip(); + writeBuffer(buffer); + buffer.clear(); + } + + // if the hint fits in the aggregation buffer, then update the aggregation buffer, + // otherwise write the hint buffer to the channel if (hint.remaining() <= buffer.remaining()) { buffer.put(hint); - return; } - - buffer.flip(); - - // update file-global CRC checksum - updateChecksum(globalCRC, buffer); - updateChecksum(globalCRC, hint); - - channel.write(new ByteBuffer[] { buffer, hint }); - buffer.clear(); + else + { + writeBuffer(hint); + } } /** @@ -247,8 +274,7 @@ final class HintsWriter implements AutoCloseable if (buffer.remaining() > 0) { - updateChecksum(globalCRC, buffer); - channel.write(buffer); + writeBuffer(buffer); } buffer.clear(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java index d627fcf..73fd040 100644 --- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java @@ -62,7 +62,7 @@ public class HintsCatalogTest writeDescriptor(directory, descriptor3); writeDescriptor(directory, descriptor4); - HintsCatalog catalog = HintsCatalog.load(directory); + HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS); assertEquals(2, catalog.stores().count()); HintsStore store1 = catalog.get(hostId1); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java new file mode 100644 index 0000000..d6a08ca --- /dev/null +++ b/test/unit/org/apache/cassandra/hints/HintsCompressionTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.hints; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.io.compress.DeflateCompressor; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.compress.LZ4Compressor; +import org.apache.cassandra.io.compress.SnappyCompressor; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.UUIDGen; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +public class HintsCompressionTest +{ + private static final String KEYSPACE = "hints_compression_test"; + private static final String TABLE = "table"; + + + private static Mutation createMutation(int index, long timestamp) + { + CFMetaData table = Schema.instance.getCFMetaData(KEYSPACE, TABLE); + return new RowUpdateBuilder(table, timestamp, bytes(index)) + .clustering(bytes(index)) + .add("val", bytes(index)) + .build(); + } + + private static Hint createHint(int idx, long baseTimestamp) + { + long timestamp = baseTimestamp + idx; + return Hint.create(createMutation(idx, TimeUnit.MILLISECONDS.toMicros(timestamp)), timestamp); + } + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + } + + private ImmutableMap<String, Object> params(Class<? extends ICompressor> compressorClass) + { + ImmutableMap<String, Object> compressionParams = ImmutableMap.<String, Object>builder() + .put(ParameterizedClass.CLASS_NAME, compressorClass.getSimpleName()) + .build(); + return ImmutableMap.<String, Object>builder() + .put(HintsDescriptor.COMPRESSION, compressionParams) + .build(); + } + + public void multiFlushAndDeserializeTest(Class<? extends ICompressor> compressorClass) throws Exception + { + int hintNum = 0; + int bufferSize = HintsWriteExecutor.WRITE_BUFFER_SIZE; + List<Hint> hints = new LinkedList<>(); + + UUID hostId = UUIDGen.getTimeUUID(); + long ts = System.currentTimeMillis(); + + HintsDescriptor descriptor = new HintsDescriptor(hostId, ts, params(compressorClass)); + File dir = Files.createTempDir(); + try (HintsWriter writer = HintsWriter.create(dir, descriptor)) + { + assert writer instanceof CompressedHintsWriter; + + ByteBuffer writeBuffer = ByteBuffer.allocateDirect(bufferSize); + try (HintsWriter.Session session = writer.newSession(writeBuffer)) + { + while (session.getBytesWritten() < bufferSize * 3) + { + Hint hint = createHint(hintNum, ts+hintNum); + session.append(hint); + hints.add(hint); + hintNum++; + } + } + } + + try (HintsReader reader = HintsReader.open(new File(dir, descriptor.fileName()))) + { + List<Hint> deserialized = new ArrayList<>(hintNum); + + for (HintsReader.Page page: reader) + { + Iterator<Hint> iterator = page.hintsIterator(); + while (iterator.hasNext()) + { + deserialized.add(iterator.next()); + } + } + + Assert.assertEquals(hints.size(), deserialized.size()); + hintNum = 0; + for (Hint expected: hints) + { + HintsTestUtil.assertHintsEqual(expected, deserialized.get(hintNum)); + hintNum++; + } + } + } + + @Test + public void lz4Compressor() throws Exception + { + multiFlushAndDeserializeTest(LZ4Compressor.class); + } + + @Test + public void snappyCompressor() throws Exception + { + multiFlushAndDeserializeTest(SnappyCompressor.class); + } + + @Test + public void deflateCompressor() throws Exception + { + multiFlushAndDeserializeTest(DeflateCompressor.class); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c20566fa/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java index 85e4b69..cc97df0 100644 --- a/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java +++ b/test/unit/org/apache/cassandra/hints/LegacyHintsMigratorTest.java @@ -80,7 +80,7 @@ public class LegacyHintsMigratorTest // truncate system.hints to enseure nothing inside Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).truncateBlocking(); new LegacyHintsMigrator(directory, 128 * 1024 * 1024).migrate(); - HintsCatalog catalog = HintsCatalog.load(directory); + HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS); assertEquals(0, catalog.stores().count()); } @@ -125,7 +125,7 @@ public class LegacyHintsMigratorTest // validate that the hints table is truncated now assertTrue(Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_HINTS).isEmpty()); - HintsCatalog catalog = HintsCatalog.load(directory); + HintsCatalog catalog = HintsCatalog.load(directory, HintsService.EMPTY_PARAMS); // assert that we've correctly loaded 10 hints stores assertEquals(10, catalog.stores().count());
