Repository: cassandra Updated Branches: refs/heads/trunk 9bf9ea740 -> 1f74142d7
CDC Follow-ups Patch by jmckenzie; reviewed by blambov for CASSANDRA-12018 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f74142d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f74142d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f74142d Branch: refs/heads/trunk Commit: 1f74142d756b3201acf0fe684943c972e7471782 Parents: 9bf9ea7 Author: Josh McKenzie <[email protected]> Authored: Mon Jul 11 15:18:04 2016 -0400 Committer: Josh McKenzie <[email protected]> Committed: Mon Jul 11 15:18:04 2016 -0400 ---------------------------------------------------------------------- .../org/apache/cassandra/db/Directories.java | 11 +++-- .../cassandra/db/commitlog/CommitLogReader.java | 3 +- .../commitlog/CommitLogSegmentManagerCDC.java | 49 ++++++++++++-------- .../db/commitlog/CompressedSegment.java | 15 +----- .../db/commitlog/EncryptedSegment.java | 16 +++---- .../db/commitlog/SimpleCachedBufferPool.java | 17 +++++-- .../utils/DirectorySizeCalculator.java | 39 ++-------------- .../test/microbench/DirectorySizerBench.java | 1 - .../cassandra/db/commitlog/CommitLogTest.java | 13 ------ 9 files changed, 62 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/Directories.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index 2a55992..87527e8 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -920,7 +920,7 @@ public class Directories if (!input.isDirectory()) return 0; - SSTableSizeSummer visitor = new SSTableSizeSummer(sstableLister(Directories.OnTxnErr.THROW).listFiles()); + SSTableSizeSummer visitor = new SSTableSizeSummer(input, sstableLister(Directories.OnTxnErr.THROW).listFiles()); try { Files.walkFileTree(input.toPath(), visitor); @@ -1006,9 +1006,11 @@ public class Directories private class SSTableSizeSummer extends DirectorySizeCalculator { - SSTableSizeSummer(List<File> files) + private final HashSet<File> toSkip; + SSTableSizeSummer(File path, List<File> files) { - super(files); + super(path); + toSkip = new HashSet<>(files); } @Override @@ -1019,8 +1021,7 @@ public class Directories return pair != null && pair.left.ksname.equals(metadata.ksName) && pair.left.cfname.equals(metadata.cfName) - && !visited.contains(fileName) - && !alive.contains(fileName); + && !toSkip.contains(fileName); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index 4594080..c797482 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@ -190,7 +190,8 @@ public class CommitLogReader ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader) { - statusTracker.tolerateErrorsInSection &= syncSegment.toleratesErrorsInSection; + // Only tolerate truncation if we allow in both global and segment + statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection; // Skip segments that are completely behind the desired minPosition if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java index 15944bd..1fac735 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java @@ -20,9 +20,11 @@ package org.apache.cassandra.db.commitlog; import java.io.File; import java.io.IOException; +import java.nio.file.FileVisitResult; import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.RateLimiter; @@ -35,6 +37,7 @@ import org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.DirectorySizeCalculator; +import org.apache.cassandra.utils.NoSpamLogger; public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { @@ -116,6 +119,12 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager if (mutation.trackedByCDC() && segment.getCDCState() == CDCState.FORBIDDEN) { cdcSizeTracker.submitOverflowSizeRecalculation(); + NoSpamLogger.log(logger, + NoSpamLogger.Level.WARN, + 10, + TimeUnit.SECONDS, + "Rejecting Mutation containing CDC-enabled table. Free up space in {}.", + DatabaseDescriptor.getCDCLogLocation()); throw new WriteTimeoutException(WriteType.CDC, ConsistencyLevel.LOCAL_ONE, 0, 1); } } @@ -148,17 +157,16 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager * * Allows atomic increment/decrement of unflushed size, however only allows increment on flushed and requires a full * directory walk to determine any potential deletions by CDC consumer. - * - * TODO: linux performs approximately 25% better with the following one-liner instead of this walker: - * Arrays.stream(path.listFiles()).mapToLong(File::length).sum(); - * However this solution is 375% slower on Windows. Revisit this and split logic to per-OS */ private static class CDCSizeTracker extends DirectorySizeCalculator { private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / DatabaseDescriptor.getCDCDiskCheckInterval()); private ExecutorService cdcSizeCalculationExecutor; private CommitLogSegmentManagerCDC segmentManager; - private AtomicLong unflushedCDCSize = new AtomicLong(0); + private volatile long unflushedCDCSize; + + // Used instead of size during walk to remove chance of over-allocation + private volatile long sizeInProgress = 0; CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path) { @@ -193,7 +201,7 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager ? CDCState.FORBIDDEN : CDCState.PERMITTED); if (segment.getCDCState() == CDCState.PERMITTED) - unflushedCDCSize.addAndGet(defaultSegmentSize()); + unflushedCDCSize += defaultSegmentSize(); } // Take this opportunity to kick off a recalc to pick up any consumer file deletion. @@ -207,9 +215,9 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { // Add to flushed size before decrementing unflushed so we don't have a window of false generosity if (segment.getCDCState() == CDCState.CONTAINS) - size.addAndGet(segment.onDiskSize()); + size += segment.onDiskSize(); if (segment.getCDCState() != CDCState.FORBIDDEN) - unflushedCDCSize.addAndGet(-defaultSegmentSize()); + unflushedCDCSize -= defaultSegmentSize(); } // Take this opportunity to kick off a recalc to pick up any consumer file deletion. @@ -251,14 +259,10 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager { try { - // Since we don't synchronize around either rebuilding our file list or walking the tree and adding to - // size, it's possible we could have changes take place underneath us and end up with a slightly incorrect - // view of our flushed size by the time this walking completes. Given that there's a linear growth in - // runtime on both rebuildFileList and walkFileTree (about 50% for each one on runtime), and that the - // window for this race should be very small, this is an acceptable trade-off since it will be resolved - // on the next segment creation / deletion with a subsequent call to submitOverflowSizeRecalculation. - rebuildFileList(); + // The Arrays.stream approach is considerably slower on Windows than linux + sizeInProgress = 0; Files.walkFileTree(path.toPath(), this); + size = sizeInProgress; } catch (IOException ie) { @@ -266,14 +270,21 @@ public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager } } - private long addFlushedSize(long toAdd) + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException + { + sizeInProgress += attrs.size(); + return FileVisitResult.CONTINUE; + } + + private void addFlushedSize(long toAdd) { - return size.addAndGet(toAdd); + size += toAdd; } private long totalCDCSizeOnDisk() { - return unflushedCDCSize.get() + size.get(); + return unflushedCDCSize + size; } public void shutdown() http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index e44dfdf..645eda9 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -48,11 +48,7 @@ public class CompressedSegment extends FileDirectSegment { super(commitLog, manager, onClose); this.compressor = commitLog.configuration.getCompressor(); - } - - ByteBuffer allocate(int size) - { - return compressor.preferredBufferType().allocate(size); + manager.getBufferPool().setPreferredReusableBufferType(compressor.preferredBufferType()); } ByteBuffer createBuffer(CommitLog commitLog) @@ -71,14 +67,7 @@ public class CompressedSegment extends FileDirectSegment try { int neededBufferSize = compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE; - ByteBuffer compressedBuffer = manager.getBufferPool().getThreadLocalReusableBuffer(); - if (compressor.preferredBufferType() != BufferType.typeOf(compressedBuffer) || - compressedBuffer.capacity() < neededBufferSize) - { - FileUtils.clean(compressedBuffer); - compressedBuffer = allocate(neededBufferSize); - manager.getBufferPool().setThreadLocalReusableBuffer(compressedBuffer); - } + ByteBuffer compressedBuffer = manager.getBufferPool().getThreadLocalReusableBuffer(neededBufferSize); ByteBuffer inputBuffer = buffer.duplicate(); inputBuffer.limit(contentStart + length).position(contentStart); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java index e13b20a..103351e 100644 --- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java @@ -25,12 +25,12 @@ import javax.crypto.Cipher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.security.EncryptionUtils; import org.apache.cassandra.security.EncryptionContext; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Hex; import org.apache.cassandra.utils.SyncUtil; @@ -79,6 +79,8 @@ public class EncryptedSegment extends FileDirectSegment throw new FSWriteError(e, logFile); } logger.debug("created a new encrypted commit log segment: {}", logFile); + // Keep reusable buffers on-heap regardless of compression preference so we avoid copy off/on repeatedly during decryption + manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP); } protected Map<String, String> additionalHeaderParameters() @@ -108,7 +110,7 @@ public class EncryptedSegment extends FileDirectSegment { ByteBuffer inputBuffer = buffer.duplicate(); inputBuffer.limit(contentStart + length).position(contentStart); - ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(); + ByteBuffer buffer = manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize()); // save space for the sync marker at the beginning of this section final long syncMarkerPosition = lastWrittenPos; @@ -132,21 +134,17 @@ public class EncryptedSegment extends FileDirectSegment lastWrittenPos = channel.position(); - // rewind to the beginning of the section and write out the sync marker, - // reusing the one of the existing buffers - buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true); + // rewind to the beginning of the section and write out the sync marker + buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE); writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos); buffer.putInt(SYNC_MARKER_SIZE, length); - buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE); + buffer.rewind(); manager.addSize(buffer.limit()); channel.position(syncMarkerPosition); channel.write(buffer); SyncUtil.force(channel, true); - - if (manager.getBufferPool().getThreadLocalReusableBuffer().capacity() < buffer.capacity()) - manager.getBufferPool().setThreadLocalReusableBuffer(buffer); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java index 1c10c25..bdec3fc 100644 --- a/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java +++ b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java @@ -57,6 +57,8 @@ public class SimpleCachedBufferPool */ private final int bufferSize; + private BufferType preferredReusableBufferType = BufferType.ON_HEAP; + public SimpleCachedBufferPool(int maxBufferPoolSize, int bufferSize) { this.maxBufferPoolSize = maxBufferPoolSize; @@ -75,14 +77,19 @@ public class SimpleCachedBufferPool return bufferType.allocate(bufferSize); } - public ByteBuffer getThreadLocalReusableBuffer() + public ByteBuffer getThreadLocalReusableBuffer(int size) { - return reusableBufferHolder.get(); + ByteBuffer result = reusableBufferHolder.get(); + if (result.capacity() < size || BufferType.typeOf(result) != preferredReusableBufferType) { + FileUtils.clean(result); + result = preferredReusableBufferType.allocate(size); + reusableBufferHolder.set(result); + } + return result; } - public void setThreadLocalReusableBuffer(ByteBuffer buffer) - { - reusableBufferHolder.set(buffer); + public void setPreferredReusableBufferType(BufferType type) { + preferredReusableBufferType = type; } public void releaseBuffer(ByteBuffer buffer) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java index aa7898c..c1fb6e0 100644 --- a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java +++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java @@ -24,39 +24,19 @@ import java.nio.file.FileVisitResult; import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.attribute.BasicFileAttributes; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import com.google.common.collect.ImmutableSet; - -import static com.google.common.collect.Sets.newHashSet; /** * Walks directory recursively, summing up total contents of files within. */ public class DirectorySizeCalculator extends SimpleFileVisitor<Path> { - protected final AtomicLong size = new AtomicLong(0); - protected Set<String> visited = newHashSet(); //count each file only once - protected Set<String> alive = newHashSet(); + protected volatile long size = 0; protected final File path; public DirectorySizeCalculator(File path) { super(); this.path = path; - rebuildFileList(); - } - - public DirectorySizeCalculator(List<File> files) - { - super(); - this.path = null; - ImmutableSet.Builder<String> builder = ImmutableSet.builder(); - for (File file : files) - builder.add(file.getName()); - alive = builder.build(); } public boolean isAcceptable(Path file) @@ -64,24 +44,11 @@ public class DirectorySizeCalculator extends SimpleFileVisitor<Path> return true; } - public void rebuildFileList() - { - assert path != null; - ImmutableSet.Builder<String> builder = ImmutableSet.builder(); - for (File file : path.listFiles()) - builder.add(file.getName()); - size.set(0); - alive = builder.build(); - } - @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { if (isAcceptable(file)) - { - size.addAndGet(attrs.size()); - visited.add(file.toFile().getName()); - } + size += attrs.size(); return FileVisitResult.CONTINUE; } @@ -93,6 +60,6 @@ public class DirectorySizeCalculator extends SimpleFileVisitor<Path> public long getAllocatedSize() { - return size.get(); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java index a653c81..34cbb17 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java @@ -99,7 +99,6 @@ public class DirectorySizerBench @Benchmark public void countFiles(final Blackhole bh) throws IOException { - sizer.rebuildFileList(); Files.walkFileTree(tempDir.toPath(), sizer); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index eff972d..ebd868a 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -508,19 +508,6 @@ public class CommitLogTest runExpecting(() -> testRecovery(new CommitLogDescriptor(4, commitLogCompression, encryptionContext), logData), expected); } - protected void testRecovery(byte[] logData) throws Exception - { - Pair<File, Integer> pair = tmpFile(); - try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw")) - { - raf.seek(pair.right); - raf.write(logData); - raf.close(); - - CommitLog.instance.recoverFiles(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ - } - } - @Test public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException {
