Repository: cassandra
Updated Branches:
  refs/heads/trunk 9bf9ea740 -> 1f74142d7


CDC Follow-ups

Patch by jmckenzie; reviewed by blambov for CASSANDRA-12018


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f74142d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f74142d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f74142d

Branch: refs/heads/trunk
Commit: 1f74142d756b3201acf0fe684943c972e7471782
Parents: 9bf9ea7
Author: Josh McKenzie <[email protected]>
Authored: Mon Jul 11 15:18:04 2016 -0400
Committer: Josh McKenzie <[email protected]>
Committed: Mon Jul 11 15:18:04 2016 -0400

----------------------------------------------------------------------
 .../org/apache/cassandra/db/Directories.java    | 11 +++--
 .../cassandra/db/commitlog/CommitLogReader.java |  3 +-
 .../commitlog/CommitLogSegmentManagerCDC.java   | 49 ++++++++++++--------
 .../db/commitlog/CompressedSegment.java         | 15 +-----
 .../db/commitlog/EncryptedSegment.java          | 16 +++----
 .../db/commitlog/SimpleCachedBufferPool.java    | 17 +++++--
 .../utils/DirectorySizeCalculator.java          | 39 ++--------------
 .../test/microbench/DirectorySizerBench.java    |  1 -
 .../cassandra/db/commitlog/CommitLogTest.java   | 13 ------
 9 files changed, 62 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 2a55992..87527e8 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -920,7 +920,7 @@ public class Directories
         if (!input.isDirectory())
             return 0;
 
-        SSTableSizeSummer visitor = new 
SSTableSizeSummer(sstableLister(Directories.OnTxnErr.THROW).listFiles());
+        SSTableSizeSummer visitor = new SSTableSizeSummer(input, 
sstableLister(Directories.OnTxnErr.THROW).listFiles());
         try
         {
             Files.walkFileTree(input.toPath(), visitor);
@@ -1006,9 +1006,11 @@ public class Directories
     
     private class SSTableSizeSummer extends DirectorySizeCalculator
     {
-        SSTableSizeSummer(List<File> files)
+        private final HashSet<File> toSkip;
+        SSTableSizeSummer(File path, List<File> files)
         {
-            super(files);
+            super(path);
+            toSkip = new HashSet<>(files);
         }
 
         @Override
@@ -1019,8 +1021,7 @@ public class Directories
             return pair != null
                     && pair.left.ksname.equals(metadata.ksName)
                     && pair.left.cfname.equals(metadata.cfName)
-                    && !visited.contains(fileName)
-                    && !alive.contains(fileName);
+                    && !toSkip.contains(fileName);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 4594080..c797482 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -190,7 +190,8 @@ public class CommitLogReader
                 ReadStatusTracker statusTracker = new 
ReadStatusTracker(mutationLimit, tolerateTruncation);
                 for (CommitLogSegmentReader.SyncSegment syncSegment : 
segmentReader)
                 {
-                    statusTracker.tolerateErrorsInSection &= 
syncSegment.toleratesErrorsInSection;
+                    // Only tolerate truncation if we allow in both global and 
segment
+                    statusTracker.tolerateErrorsInSection = tolerateTruncation 
& syncSegment.toleratesErrorsInSection;
 
                     // Skip segments that are completely behind the desired 
minPosition
                     if (desc.id == minPosition.segmentId && 
syncSegment.endPosition < minPosition.position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
index 15944bd..1fac735 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManagerCDC.java
@@ -20,9 +20,11 @@ package org.apache.cassandra.db.commitlog;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
@@ -35,6 +37,7 @@ import 
org.apache.cassandra.db.commitlog.CommitLogSegment.CDCState;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.DirectorySizeCalculator;
+import org.apache.cassandra.utils.NoSpamLogger;
 
 public class CommitLogSegmentManagerCDC extends AbstractCommitLogSegmentManager
 {
@@ -116,6 +119,12 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         if (mutation.trackedByCDC() && segment.getCDCState() == 
CDCState.FORBIDDEN)
         {
             cdcSizeTracker.submitOverflowSizeRecalculation();
+            NoSpamLogger.log(logger,
+                             NoSpamLogger.Level.WARN,
+                             10,
+                             TimeUnit.SECONDS,
+                             "Rejecting Mutation containing CDC-enabled table. 
Free up space in {}.",
+                             DatabaseDescriptor.getCDCLogLocation());
             throw new WriteTimeoutException(WriteType.CDC, 
ConsistencyLevel.LOCAL_ONE, 0, 1);
         }
     }
@@ -148,17 +157,16 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
      *
      * Allows atomic increment/decrement of unflushed size, however only 
allows increment on flushed and requires a full
      * directory walk to determine any potential deletions by CDC consumer.
-     *
-     * TODO: linux performs approximately 25% better with the following 
one-liner instead of this walker:
-     *      Arrays.stream(path.listFiles()).mapToLong(File::length).sum();
-     * However this solution is 375% slower on Windows. Revisit this and split 
logic to per-OS
      */
     private static class CDCSizeTracker extends DirectorySizeCalculator
     {
         private final RateLimiter rateLimiter = RateLimiter.create(1000.0 / 
DatabaseDescriptor.getCDCDiskCheckInterval());
         private ExecutorService cdcSizeCalculationExecutor;
         private CommitLogSegmentManagerCDC segmentManager;
-        private AtomicLong unflushedCDCSize = new AtomicLong(0);
+        private volatile long unflushedCDCSize;
+
+        // Used instead of size during walk to remove chance of over-allocation
+        private volatile long sizeInProgress = 0;
 
         CDCSizeTracker(CommitLogSegmentManagerCDC segmentManager, File path)
         {
@@ -193,7 +201,7 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
                                     ? CDCState.FORBIDDEN
                                     : CDCState.PERMITTED);
                 if (segment.getCDCState() == CDCState.PERMITTED)
-                    unflushedCDCSize.addAndGet(defaultSegmentSize());
+                    unflushedCDCSize += defaultSegmentSize();
             }
 
             // Take this opportunity to kick off a recalc to pick up any 
consumer file deletion.
@@ -207,9 +215,9 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
             {
                 // Add to flushed size before decrementing unflushed so we 
don't have a window of false generosity
                 if (segment.getCDCState() == CDCState.CONTAINS)
-                    size.addAndGet(segment.onDiskSize());
+                    size += segment.onDiskSize();
                 if (segment.getCDCState() != CDCState.FORBIDDEN)
-                    unflushedCDCSize.addAndGet(-defaultSegmentSize());
+                    unflushedCDCSize -= defaultSegmentSize();
             }
 
             // Take this opportunity to kick off a recalc to pick up any 
consumer file deletion.
@@ -251,14 +259,10 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
         {
             try
             {
-                // Since we don't synchronize around either rebuilding our 
file list or walking the tree and adding to
-                // size, it's possible we could have changes take place 
underneath us and end up with a slightly incorrect
-                // view of our flushed size by the time this walking 
completes. Given that there's a linear growth in
-                // runtime on both rebuildFileList and walkFileTree (about 50% 
for each one on runtime), and that the
-                // window for this race should be very small, this is an 
acceptable trade-off since it will be resolved
-                // on the next segment creation / deletion with a subsequent 
call to submitOverflowSizeRecalculation.
-                rebuildFileList();
+                // The Arrays.stream approach is considerably slower on 
Windows than linux
+                sizeInProgress = 0;
                 Files.walkFileTree(path.toPath(), this);
+                size = sizeInProgress;
             }
             catch (IOException ie)
             {
@@ -266,14 +270,21 @@ public class CommitLogSegmentManagerCDC extends 
AbstractCommitLogSegmentManager
             }
         }
 
-        private long addFlushedSize(long toAdd)
+        @Override
+        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
throws IOException
+        {
+            sizeInProgress += attrs.size();
+            return FileVisitResult.CONTINUE;
+        }
+
+        private void addFlushedSize(long toAdd)
         {
-            return size.addAndGet(toAdd);
+            size += toAdd;
         }
 
         private long totalCDCSizeOnDisk()
         {
-            return unflushedCDCSize.get() + size.get();
+            return unflushedCDCSize + size;
         }
 
         public void shutdown()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index e44dfdf..645eda9 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -48,11 +48,7 @@ public class CompressedSegment extends FileDirectSegment
     {
         super(commitLog, manager, onClose);
         this.compressor = commitLog.configuration.getCompressor();
-    }
-
-    ByteBuffer allocate(int size)
-    {
-        return compressor.preferredBufferType().allocate(size);
+        
manager.getBufferPool().setPreferredReusableBufferType(compressor.preferredBufferType());
     }
 
     ByteBuffer createBuffer(CommitLog commitLog)
@@ -71,14 +67,7 @@ public class CompressedSegment extends FileDirectSegment
         try
         {
             int neededBufferSize = 
compressor.initialCompressedBufferLength(length) + COMPRESSED_MARKER_SIZE;
-            ByteBuffer compressedBuffer = 
manager.getBufferPool().getThreadLocalReusableBuffer();
-            if (compressor.preferredBufferType() != 
BufferType.typeOf(compressedBuffer) ||
-                compressedBuffer.capacity() < neededBufferSize)
-            {
-                FileUtils.clean(compressedBuffer);
-                compressedBuffer = allocate(neededBufferSize);
-                
manager.getBufferPool().setThreadLocalReusableBuffer(compressedBuffer);
-            }
+            ByteBuffer compressedBuffer = 
manager.getBufferPool().getThreadLocalReusableBuffer(neededBufferSize);
 
             ByteBuffer inputBuffer = buffer.duplicate();
             inputBuffer.limit(contentStart + length).position(contentStart);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java 
b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index e13b20a..103351e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@ -25,12 +25,12 @@ import javax.crypto.Cipher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.security.EncryptionUtils;
 import org.apache.cassandra.security.EncryptionContext;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Hex;
 import org.apache.cassandra.utils.SyncUtil;
 
@@ -79,6 +79,8 @@ public class EncryptedSegment extends FileDirectSegment
             throw new FSWriteError(e, logFile);
         }
         logger.debug("created a new encrypted commit log segment: {}", 
logFile);
+        // Keep reusable buffers on-heap regardless of compression preference 
so we avoid copy off/on repeatedly during decryption
+        
manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP);
     }
 
     protected Map<String, String> additionalHeaderParameters()
@@ -108,7 +110,7 @@ public class EncryptedSegment extends FileDirectSegment
         {
             ByteBuffer inputBuffer = buffer.duplicate();
             inputBuffer.limit(contentStart + length).position(contentStart);
-            ByteBuffer buffer = 
manager.getBufferPool().getThreadLocalReusableBuffer();
+            ByteBuffer buffer = 
manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize());
 
             // save space for the sync marker at the beginning of this section
             final long syncMarkerPosition = lastWrittenPos;
@@ -132,21 +134,17 @@ public class EncryptedSegment extends FileDirectSegment
 
             lastWrittenPos = channel.position();
 
-            // rewind to the beginning of the section and write out the sync 
marker,
-            // reusing the one of the existing buffers
-            buffer = ByteBufferUtil.ensureCapacity(buffer, 
ENCRYPTED_SECTION_HEADER_SIZE, true);
+            // rewind to the beginning of the section and write out the sync 
marker
+            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
             writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) 
lastWrittenPos);
             buffer.putInt(SYNC_MARKER_SIZE, length);
-            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+            buffer.rewind();
             manager.addSize(buffer.limit());
 
             channel.position(syncMarkerPosition);
             channel.write(buffer);
 
             SyncUtil.force(channel, true);
-
-            if 
(manager.getBufferPool().getThreadLocalReusableBuffer().capacity() < 
buffer.capacity())
-                manager.getBufferPool().setThreadLocalReusableBuffer(buffer);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java 
b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
index 1c10c25..bdec3fc 100644
--- a/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
+++ b/src/java/org/apache/cassandra/db/commitlog/SimpleCachedBufferPool.java
@@ -57,6 +57,8 @@ public class SimpleCachedBufferPool
      */
     private final int bufferSize;
 
+    private BufferType preferredReusableBufferType = BufferType.ON_HEAP;
+
     public SimpleCachedBufferPool(int maxBufferPoolSize, int bufferSize)
     {
         this.maxBufferPoolSize = maxBufferPoolSize;
@@ -75,14 +77,19 @@ public class SimpleCachedBufferPool
         return bufferType.allocate(bufferSize);
     }
 
-    public ByteBuffer getThreadLocalReusableBuffer()
+    public ByteBuffer getThreadLocalReusableBuffer(int size)
     {
-        return reusableBufferHolder.get();
+        ByteBuffer result = reusableBufferHolder.get();
+        if (result.capacity() < size || BufferType.typeOf(result) != 
preferredReusableBufferType) {
+            FileUtils.clean(result);
+            result = preferredReusableBufferType.allocate(size);
+            reusableBufferHolder.set(result);
+        }
+        return result;
     }
 
-    public void setThreadLocalReusableBuffer(ByteBuffer buffer)
-    {
-        reusableBufferHolder.set(buffer);
+    public void setPreferredReusableBufferType(BufferType type) {
+        preferredReusableBufferType = type;
     }
 
     public void releaseBuffer(ByteBuffer buffer)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java 
b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
index aa7898c..c1fb6e0 100644
--- a/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
+++ b/src/java/org/apache/cassandra/utils/DirectorySizeCalculator.java
@@ -24,39 +24,19 @@ import java.nio.file.FileVisitResult;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.collect.ImmutableSet;
-
-import static com.google.common.collect.Sets.newHashSet;
 
 /**
  * Walks directory recursively, summing up total contents of files within.
  */
 public class DirectorySizeCalculator extends SimpleFileVisitor<Path>
 {
-    protected final AtomicLong size = new AtomicLong(0);
-    protected Set<String> visited = newHashSet(); //count each file only once
-    protected Set<String> alive = newHashSet();
+    protected volatile long size = 0;
     protected final File path;
 
     public DirectorySizeCalculator(File path)
     {
         super();
         this.path = path;
-        rebuildFileList();
-    }
-
-    public DirectorySizeCalculator(List<File> files)
-    {
-        super();
-        this.path = null;
-        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
-        for (File file : files)
-            builder.add(file.getName());
-        alive = builder.build();
     }
 
     public boolean isAcceptable(Path file)
@@ -64,24 +44,11 @@ public class DirectorySizeCalculator extends 
SimpleFileVisitor<Path>
         return true;
     }
 
-    public void rebuildFileList()
-    {
-        assert path != null;
-        ImmutableSet.Builder<String> builder = ImmutableSet.builder();
-        for (File file : path.listFiles())
-            builder.add(file.getName());
-        size.set(0);
-        alive = builder.build();
-    }
-
     @Override
     public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) 
throws IOException
     {
         if (isAcceptable(file))
-        {
-            size.addAndGet(attrs.size());
-            visited.add(file.toFile().getName());
-        }
+            size += attrs.size();
         return FileVisitResult.CONTINUE;
     }
 
@@ -93,6 +60,6 @@ public class DirectorySizeCalculator extends 
SimpleFileVisitor<Path>
 
     public long getAllocatedSize()
     {
-        return size.get();
+        return size;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
----------------------------------------------------------------------
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java 
b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
index a653c81..34cbb17 100644
--- 
a/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/DirectorySizerBench.java
@@ -99,7 +99,6 @@ public class DirectorySizerBench
     @Benchmark
     public void countFiles(final Blackhole bh) throws IOException
     {
-        sizer.rebuildFileList();
         Files.walkFileTree(tempDir.toPath(), sizer);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f74142d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index eff972d..ebd868a 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -508,19 +508,6 @@ public class CommitLogTest
         runExpecting(() -> testRecovery(new CommitLogDescriptor(4, 
commitLogCompression, encryptionContext), logData), expected);
     }
 
-    protected void testRecovery(byte[] logData) throws Exception
-    {
-        Pair<File, Integer> pair = tmpFile();
-        try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw"))
-        {
-            raf.seek(pair.right);
-            raf.write(logData);
-            raf.close();
-
-            CommitLog.instance.recoverFiles(pair.left); //CASSANDRA-1119 / 
CASSANDRA-1179 throw on failure*/
-        }
-    }
-
     @Test
     public void testTruncateWithoutSnapshot() throws ExecutionException, 
InterruptedException, IOException
     {

Reply via email to