Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be211749
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be211749
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be211749

Branch: refs/heads/trunk
Commit: be2117492f3d9ace24bbf18e57e94b2a08965763
Parents: 2a24acf 95839aa
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Fri Sep 29 15:31:37 2017 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Fri Sep 29 15:32:32 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../cassandra/db/commitlog/CommitLogReader.java | 53 +++++++++++++++++++-
 .../db/commitlog/CommitLogReplayer.java         |  3 --
 .../db/commitlog/CommitLogSegment.java          |  2 +-
 .../db/commitlog/CompressedSegment.java         |  2 +-
 .../db/commitlog/EncryptedSegment.java          |  4 +-
 .../db/commitlog/MemoryMappedSegment.java       |  2 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 53 ++++++++++++++++++++
 8 files changed, 110 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6c3a1d0,7ff61d3..a782333
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,9 +1,20 @@@
 -3.0.15
 +3.11.1
- =======
 + * Fix the computation of cdc_total_space_in_mb for exabyte filesystems 
(CASSANDRA-13808)
 + * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
 + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is 
a single leaf and overflow collisions (CASSANDRA-13869)
 + * Add a compaction option to TWCS to ignore sstables overlapping checks 
(CASSANDRA-13418)
 + * BTree.Builder memory leak (CASSANDRA-13754)
 + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to 
correctness (CASSANDRA-13798)
 + * Fix cassandra-stress hang issues when an error during cluster connection 
happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range 
movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation 
(CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Filter header only commit logs before recovery (CASSANDRA-13918)
   * AssertionError prepending to a list (CASSANDRA-13149)
   * Fix support for SuperColumn tables (CASSANDRA-12373)
 - * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
 - * Fix missing original update in TriggerExecutor (CASSANDRA-13894)
   * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043)
   * Improve short read protection performance (CASSANDRA-13794)
   * Fix sstable reader to support range-tombstone-marker for multi-slices 
(CASSANDRA-13787)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 8c04329,0000000..4d74557
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -1,515 -1,0 +1,564 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.zip.CRC32;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.UnknownColumnFamilyException;
 +import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
 +import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.SerializationHelper;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.util.ChannelProxy;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +
 +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 +
 +public class CommitLogReader
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(CommitLogReader.class);
 +
 +    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 +
 +    @VisibleForTesting
 +    public static final int ALL_MUTATIONS = -1;
 +    private final CRC32 checksum;
 +    private final Map<UUID, AtomicInteger> invalidMutations;
 +
 +    private byte[] buffer;
 +
 +    public CommitLogReader()
 +    {
 +        checksum = new CRC32();
 +        invalidMutations = new HashMap<>();
 +        buffer = new byte[4096];
 +    }
 +
 +    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
 +    {
 +        return invalidMutations.entrySet();
 +    }
 +
 +    /**
 +     * Reads all passed in files with no minimum, no start, and no mutation 
limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files) 
throws IOException
 +    {
 +        readAllFiles(handler, files, CommitLogPosition.NONE);
 +    }
 +
++    private static boolean shouldSkip(File file) throws IOException, 
ConfigurationException
++    {
++        CommitLogDescriptor desc = 
CommitLogDescriptor.fromFileName(file.getName());
++        if (desc.version < CommitLogDescriptor.VERSION_21)
++        {
++            return false;
++        }
++        try(RandomAccessReader reader = RandomAccessReader.open(file))
++        {
++            CommitLogDescriptor.readHeader(reader, 
DatabaseDescriptor.getEncryptionContext());
++            int end = reader.readInt();
++            long filecrc = reader.readInt() & 0xffffffffL;
++            return end == 0 && filecrc == 0;
++        }
++    }
++
++    private static List<File> filterCommitLogFiles(File[] toFilter)
++    {
++        List<File> filtered = new ArrayList<>(toFilter.length);
++        for (File file: toFilter)
++        {
++            try
++            {
++                if (shouldSkip(file))
++                {
++                    logger.info("Skipping playback of empty log: {}", 
file.getName());
++                }
++                else
++                {
++                    filtered.add(file);
++                }
++            }
++            catch (Exception e)
++            {
++                // let recover deal with it
++                filtered.add(file);
++            }
++        }
++
++        return filtered;
++    }
++
 +    /**
 +     * Reads all passed in files with minPosition, no start, and no mutation 
limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files, 
CommitLogPosition minPosition) throws IOException
 +    {
-         for (int i = 0; i < files.length; i++)
-             readCommitLogSegment(handler, files[i], minPosition, 
ALL_MUTATIONS, i + 1 == files.length);
++        List<File> filteredLogs = filterCommitLogFiles(files);
++        int i = 0;
++        for (File file: filteredLogs)
++        {
++            i++;
++            readCommitLogSegment(handler, file, minPosition, ALL_MUTATIONS, i 
== filteredLogs.size());
++        }
 +    }
 +
 +    /**
 +     * Reads passed in file fully
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, 
boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, 
ALL_MUTATIONS, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads passed in file fully, up to mutationLimit count
 +     */
 +    @VisibleForTesting
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, 
int mutationLimit, boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, 
mutationLimit, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads mutations from file, handing them off to handler
 +     * @param handler Handler that will take action based on deserialized 
Mutations
 +     * @param file CommitLogSegment file to read
 +     * @param minPosition Optional minimum CommitLogPosition - all segments 
with id > or matching w/greater position will be read
 +     * @param mutationLimit Optional limit on # of mutations to replay. Local 
ALL_MUTATIONS serves as marker to play all.
 +     * @param tolerateTruncation Whether or not we should allow truncation of 
this file or throw if EOF found
 +     *
 +     * @throws IOException
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler,
 +                                     File file,
 +                                     CommitLogPosition minPosition,
 +                                     int mutationLimit,
 +                                     boolean tolerateTruncation) throws 
IOException
 +    {
 +        // just transform from the file name (no reading of headers) to 
determine version
 +        CommitLogDescriptor desc = 
CommitLogDescriptor.fromFileName(file.getName());
 +
 +        try(RandomAccessReader reader = RandomAccessReader.open(file))
 +        {
 +            if (desc.version < CommitLogDescriptor.VERSION_21)
 +            {
 +                if (!shouldSkipSegmentId(file, desc, minPosition))
 +                {
 +                    if (minPosition.segmentId == desc.id)
 +                        reader.seek(minPosition.position);
 +                    ReadStatusTracker statusTracker = new 
ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                    statusTracker.errorContext = desc.fileName();
 +                    readSection(handler, reader, minPosition, (int) 
reader.length(), statusTracker, desc);
 +                }
 +                return;
 +            }
 +
 +            final long segmentIdFromFilename = desc.id;
 +            try
 +            {
 +                // The following call can either throw or legitimately return 
null. For either case, we need to check
 +                // desc outside this block and set it to null in the 
exception case.
 +                desc = CommitLogDescriptor.readHeader(reader, 
DatabaseDescriptor.getEncryptionContext());
 +            }
 +            catch (Exception e)
 +            {
 +                desc = null;
 +            }
 +            if (desc == null)
 +            {
 +                // don't care about whether or not the handler thinks we can 
continue. We can't w/out descriptor.
 +                // whether or not we continue with startup will depend on 
whether this is the last segment
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Could not read commit log descriptor in 
file %s", file),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
 +                    tolerateTruncation));
 +                return;
 +            }
 +
 +            if (segmentIdFromFilename != desc.id)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(String.format(
 +                    "Segment id mismatch (filename %d, descriptor %d) in file 
%s", segmentIdFromFilename, desc.id, file),
 +                                                                              
  CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
 +                                                                              
  false)))
 +                {
 +                    return;
 +                }
 +            }
 +
 +            if (shouldSkipSegmentId(file, desc, minPosition))
 +                return;
 +
 +            CommitLogSegmentReader segmentReader;
 +            try
 +            {
 +                segmentReader = new CommitLogSegmentReader(handler, desc, 
reader, tolerateTruncation);
 +            }
 +            catch(Exception e)
 +            {
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Unable to create segment reader for commit 
log file: %s", e),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
 +                    tolerateTruncation));
 +                return;
 +            }
 +
 +            try
 +            {
 +                ReadStatusTracker statusTracker = new 
ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                for (CommitLogSegmentReader.SyncSegment syncSegment : 
segmentReader)
 +                {
 +                    // Only tolerate truncation if we allow in both global 
and segment
 +                    statusTracker.tolerateErrorsInSection = 
tolerateTruncation & syncSegment.toleratesErrorsInSection;
 +
 +                    // Skip segments that are completely behind the desired 
minPosition
 +                    if (desc.id == minPosition.segmentId && 
syncSegment.endPosition < minPosition.position)
 +                        continue;
 +
 +                    statusTracker.errorContext = String.format("Next section 
at %d in %s", syncSegment.fileStartPosition, desc.fileName());
 +
 +                    readSection(handler, syncSegment.input, minPosition, 
syncSegment.endPosition, statusTracker, desc);
 +                    if (!statusTracker.shouldContinue())
 +                        break;
 +                }
 +            }
 +            // Unfortunately AbstractIterator cannot throw a checked 
exception, so we check to see if a RuntimeException
 +            // is wrapping an IOException.
 +            catch (RuntimeException re)
 +            {
 +                if (re.getCause() instanceof IOException)
 +                    throw (IOException) re.getCause();
 +                throw re;
 +            }
 +            logger.debug("Finished reading {}", file);
 +        }
 +    }
 +
 +    /**
 +     * Any segment with id >= minPosition.segmentId is a candidate for read.
 +     */
 +    private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, 
CommitLogPosition minPosition)
 +    {
 +        logger.debug("Reading {} (CL version {}, messaging version {}, 
compression {})",
 +            file.getPath(),
 +            desc.version,
 +            desc.getMessagingVersion(),
 +            desc.compression);
 +
 +        if (minPosition.segmentId > desc.id)
 +        {
 +            logger.trace("Skipping read of fully-flushed {}", file);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Reads a section of a file containing mutations
 +     *
 +     * @param handler Handler that will take action based on deserialized 
Mutations
 +     * @param reader FileDataInput / logical buffer containing commitlog 
mutations
 +     * @param minPosition CommitLogPosition indicating when we should start 
actively replaying mutations
 +     * @param end logical numeric end of the segment being read
 +     * @param statusTracker ReadStatusTracker with current state of mutation 
count, error state, etc
 +     * @param desc Descriptor for CommitLog serialization
 +     */
 +    private void readSection(CommitLogReadHandler handler,
 +                             FileDataInput reader,
 +                             CommitLogPosition minPosition,
 +                             int end,
 +                             ReadStatusTracker statusTracker,
 +                             CommitLogDescriptor desc) throws IOException
 +    {
 +        // seek rather than deserializing mutation-by-mutation to reach the 
desired minPosition in this SyncSegment
 +        if (desc.id == minPosition.segmentId && reader.getFilePointer() < 
minPosition.position)
 +            reader.seek(minPosition.position);
 +
 +        while (statusTracker.shouldContinue() && reader.getFilePointer() < 
end && !reader.isEOF())
 +        {
 +            long mutationStart = reader.getFilePointer();
 +            if (logger.isTraceEnabled())
 +                logger.trace("Reading mutation at {}", mutationStart);
 +
 +            long claimedCRC32;
 +            int serializedSize;
 +            try
 +            {
 +                // We rely on reading serialized size == 0 
(LEGACY_END_OF_SEGMENT_MARKER) to identify the end
 +                // of a segment, which happens naturally due to the 0 padding 
of the empty segment on creation.
 +                // However, it's possible with 2.1 era commitlogs that the 
last mutation ended less than 4 bytes
 +                // from the end of the file, which means that we'll be unable 
to read an a full int and instead
 +                // read an EOF here
 +                if(end - reader.getFilePointer() < 4)
 +                {
 +                    logger.trace("Not enough bytes left for another mutation 
in this CommitLog segment, continuing");
 +                    statusTracker.requestTermination();
 +                    return;
 +                }
 +
 +                // any of the reads may hit EOF
 +                serializedSize = reader.readInt();
 +                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 +                {
 +                    logger.trace("Encountered end of segment marker at {}", 
reader.getFilePointer());
 +                    statusTracker.requestTermination();
 +                    return;
 +                }
 +
 +                // Mutation must be at LEAST 10 bytes:
 +                //    3 for a non-empty Keyspace
 +                //    3 for a Key (including the 2-byte length from 
writeUTF/writeWithShortLength)
 +                //    4 bytes for column count.
 +                // This prevents CRC by being fooled by special-case garbage 
in the file; see CASSANDRA-2128
 +                if (serializedSize < 10)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                    String.format("Invalid 
mutation size %d at %d in %s", serializedSize, mutationStart, 
statusTracker.errorContext),
 +                                                    
CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    
statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                long claimedSizeChecksum = 
CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
 +                checksum.reset();
 +                CommitLogFormat.updateChecksum(checksum, serializedSize, 
desc.version);
 +
 +                if (checksum.getValue() != claimedSizeChecksum)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                    String.format("Mutation 
size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                    
CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    
statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                if (serializedSize > buffer.length)
 +                    buffer = new byte[(int) (1.2 * serializedSize)];
 +                reader.readFully(buffer, 0, serializedSize);
 +
 +                claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, 
desc.version);
 +            }
 +            catch (EOFException eof)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                String.format("Unexpected end 
of segment at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.EOF,
 +                                                
statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                return;
 +            }
 +
 +            checksum.update(buffer, 0, serializedSize);
 +            if (claimedCRC32 != checksum.getValue())
 +            {
 +                if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                String.format("Mutation 
checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                
CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                
statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                continue;
 +            }
 +
 +            long mutationPosition = reader.getFilePointer();
 +            readMutation(handler, buffer, serializedSize, minPosition, 
(int)mutationPosition, desc);
 +
 +            // Only count this as a processed mutation if it is after our min 
as we suppress reading of mutations that
 +            // are before this mark.
 +            if (mutationPosition >= minPosition.position)
 +                statusTracker.addProcessedMutation();
 +        }
 +    }
 +
 +    /**
 +     * Deserializes and passes a Mutation to the ICommitLogReadHandler 
requested
 +     *
 +     * @param handler Handler that will take action based on deserialized 
Mutations
 +     * @param inputBuffer raw byte array w/Mutation data
 +     * @param size deserialized size of mutation
 +     * @param minPosition We need to suppress replay of mutations that are 
before the required minPosition
 +     * @param entryLocation filePointer offset of mutation within 
CommitLogSegment
 +     * @param desc CommitLogDescriptor being worked on
 +     */
 +    @VisibleForTesting
 +    protected void readMutation(CommitLogReadHandler handler,
 +                                byte[] inputBuffer,
 +                                int size,
 +                                CommitLogPosition minPosition,
 +                                final int entryLocation,
 +                                final CommitLogDescriptor desc) throws 
IOException
 +    {
 +        // For now, we need to go through the motions of deserializing the 
mutation to determine its size and move
 +        // the file pointer forward accordingly, even if we're behind the 
requested minPosition within this SyncSegment.
 +        boolean shouldReplay = entryLocation > minPosition.position;
 +
 +        final Mutation mutation;
 +        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 
0, size))
 +        {
 +            mutation = Mutation.serializer.deserialize(bufIn,
 +                                                       
desc.getMessagingVersion(),
 +                                                       
SerializationHelper.Flag.LOCAL);
 +            // doublecheck that what we read is still] valid for the current 
schema
 +            for (PartitionUpdate upd : mutation.getPartitionUpdates())
 +                upd.validate();
 +        }
 +        catch (UnknownColumnFamilyException ex)
 +        {
 +            if (ex.cfId == null)
 +                return;
 +            AtomicInteger i = invalidMutations.get(ex.cfId);
 +            if (i == null)
 +            {
 +                i = new AtomicInteger(1);
 +                invalidMutations.put(ex.cfId, i);
 +            }
 +            else
 +                i.incrementAndGet();
 +            return;
 +        }
 +        catch (Throwable t)
 +        {
 +            JVMStabilityInspector.inspectThrowable(t);
 +            File f = File.createTempFile("mutation", "dat");
 +
 +            try (DataOutputStream out = new DataOutputStream(new 
FileOutputStream(f)))
 +            {
 +                out.write(inputBuffer, 0, size);
 +            }
 +
 +            // Checksum passed so this error can't be permissible.
 +            handler.handleUnrecoverableError(new CommitLogReadException(
 +                String.format(
 +                    "Unexpected error deserializing mutation; saved to %s.  " 
+
 +                    "This may be caused by replaying a mutation against a 
table with the same name but incompatible schema.  " +
 +                    "Exception follows: %s", f.getAbsolutePath(), t),
 +                CommitLogReadErrorReason.MUTATION_ERROR,
 +                false));
 +            return;
 +        }
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Read mutation for {}.{}: {}", 
mutation.getKeyspaceName(), mutation.key(),
 +                         "{" + 
StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
 +
 +        if (shouldReplay)
 +            handler.handleMutation(mutation, size, entryLocation, desc);
 +    }
 +
 +    /**
 +     * Helper methods to deal with changing formats of internals of the 
CommitLog without polluting deserialization code.
 +     */
 +    private static class CommitLogFormat
 +    {
 +        public static long calculateClaimedChecksum(FileDataInput input, int 
commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +
 +        public static void updateChecksum(CRC32 checksum, int serializedSize, 
int commitLogVersion)
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                    checksum.update(serializedSize);
 +                    break;
 +                // Changed format in 2.0
 +                default:
 +                    updateChecksumInt(checksum, serializedSize);
 +                    break;
 +            }
 +        }
 +
 +        public static long calculateClaimedCRC32(FileDataInput input, int 
commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +    }
 +
 +    private static class ReadStatusTracker
 +    {
 +        private int mutationsLeft;
 +        public String errorContext = "";
 +        public boolean tolerateErrorsInSection;
 +        private boolean error;
 +
 +        public ReadStatusTracker(int mutationLimit, boolean 
tolerateErrorsInSection)
 +        {
 +            this.mutationsLeft = mutationLimit;
 +            this.tolerateErrorsInSection = tolerateErrorsInSection;
 +        }
 +
 +        public void addProcessedMutation()
 +        {
 +            if (mutationsLeft == ALL_MUTATIONS)
 +                return;
 +            --mutationsLeft;
 +        }
 +
 +        public boolean shouldContinue()
 +        {
 +            return !error && (mutationsLeft != 0 || mutationsLeft == 
ALL_MUTATIONS);
 +        }
 +
 +        public void requestTermination()
 +        {
 +            error = true;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 4d2971f,4fd263c..ea62fd8
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -18,23 -18,26 +18,20 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 -import java.io.DataOutputStream;
 -import java.io.EOFException;
  import java.io.File;
 -import java.io.FileOutputStream;
  import java.io.IOException;
 -import java.nio.ByteBuffer;
  import java.util.*;
- import java.util.concurrent.ExecutionException;
  import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicInteger;
 -import java.util.zip.CRC32;
  
 +import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Predicate;
- import com.google.common.base.Throwables;
 -import com.google.common.collect.HashMultimap;
 -import com.google.common.collect.Iterables;
 -import com.google.common.collect.Multimap;
 -import com.google.common.collect.Ordering;
 -
 +import com.google.common.collect.*;
- import com.google.common.util.concurrent.Uninterruptibles;
  import org.apache.commons.lang3.StringUtils;
 +import org.cliffc.high_scale_lib.NonBlockingHashSet;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 +
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.CFMetaData;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 8670c64,236a1b1..a618d0b
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -337,20 -315,8 +337,20 @@@ public abstract class CommitLogSegmen
          syncComplete.signalAll();
      }
  
 +    /**
 +     * Create a sync marker to delineate sections of the commit log, 
typically created on each sync of the file.
 +     * The sync marker consists of a file pointer to where the next sync 
marker should be (effectively declaring the length
 +     * of this section), as well as a CRC value.
 +     *
 +     * @param buffer buffer in which to write out the sync marker.
 +     * @param offset Offset into the {@code buffer} at which to write the 
sync marker.
 +     * @param filePos The current position in the target file where the sync 
marker will be written (most likely different from the buffer position).
 +     * @param nextMarker The file position of where the next sync marker 
should be.
 +     */
-     protected void writeSyncMarker(ByteBuffer buffer, int offset, int 
filePos, int nextMarker)
+     protected static void writeSyncMarker(long id, ByteBuffer buffer, int 
offset, int filePos, int nextMarker)
      {
 +        if (filePos > nextMarker)
 +            throw new IllegalArgumentException(String.format("commit log sync 
marker's current file position %d is greater than next file position %d", 
filePos, nextMarker));
          CRC32 crc = new CRC32();
          updateChecksumInt(crc, (int) (id & 0xFFFFFFFFL));
          updateChecksumInt(crc, (int) (id >>> 32));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 288b766,c00ce18..967db15
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -77,8 -134,8 +77,8 @@@ public class CompressedSegment extends 
  
              // Only one thread can be here at a given time.
              // Protected by synchronization on CommitLogSegment.sync().
-             writeSyncMarker(compressedBuffer, 0, (int) channel.position(), 
(int) channel.position() + compressedBuffer.remaining());
+             writeSyncMarker(id, compressedBuffer, 0, (int) 
channel.position(), (int) channel.position() + compressedBuffer.remaining());
 -            commitLog.allocator.addSize(compressedBuffer.limit());
 +            manager.addSize(compressedBuffer.limit());
              channel.write(compressedBuffer);
              assert channel.position() - lastWrittenPos == 
compressedBuffer.limit();
              lastWrittenPos = channel.position();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 4ca1ede,0000000..87825ab
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,159 -1,0 +1,159 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Map;
 +import javax.crypto.Cipher;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.compress.ICompressor;
 +import org.apache.cassandra.security.EncryptionUtils;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.utils.Hex;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +import static 
org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
 +
 +/**
 + * Writes encrypted segments to disk. Data is compressed before encrypting to 
(hopefully) reduce the size of the data into
 + * the encryption algorithms.
 + *
 + * The format of the encrypted commit log is as follows:
 + * - standard commit log header (as written by {@link 
CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
 + * - a series of 'sync segments' that are written every time the commit log 
is sync()'ed
-  * -- a sync section header, see {@link 
CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
++ * -- a sync section header, see {@link 
CommitLogSegment#writeSyncMarker(long, ByteBuffer, int, int, int)}
 + * -- total plain text length for this section
 + * -- a series of encrypted data blocks, each of which contains:
 + * --- the length of the encrypted block (cipher text)
 + * --- the length of the unencrypted data (compressed text)
 + * --- the encrypted block, which contains:
 + * ---- the length of the plain text (raw) data
 + * ---- block of compressed data
 + *
 + * Notes:
 + * - "length of the unencrypted data" is different from the length of 
resulting decrypted buffer as encryption adds padding
 + * to the output buffer, and we need to ignore that padding when processing.
 + */
 +public class EncryptedSegment extends FileDirectSegment
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(EncryptedSegment.class);
 +
 +    private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE 
+ 4;
 +
 +    private final EncryptionContext encryptionContext;
 +    private final Cipher cipher;
 +
 +    public EncryptedSegment(CommitLog commitLog, 
AbstractCommitLogSegmentManager manager)
 +    {
 +        super(commitLog, manager);
 +        this.encryptionContext = 
commitLog.configuration.getEncryptionContext();
 +
 +        try
 +        {
 +            cipher = encryptionContext.getEncryptor();
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSWriteError(e, logFile);
 +        }
 +        logger.debug("created a new encrypted commit log segment: {}", 
logFile);
 +        // Keep reusable buffers on-heap regardless of compression preference 
so we avoid copy off/on repeatedly during decryption
 +        
manager.getBufferPool().setPreferredReusableBufferType(BufferType.ON_HEAP);
 +    }
 +
 +    protected Map<String, String> additionalHeaderParameters()
 +    {
 +        Map<String, String> map = encryptionContext.toHeaderParameters();
 +        map.put(EncryptionContext.ENCRYPTION_IV, 
Hex.bytesToHex(cipher.getIV()));
 +        return map;
 +    }
 +
 +    ByteBuffer createBuffer(CommitLog commitLog)
 +    {
 +        // Note: we want to keep the compression buffers on-heap as we need 
those bytes for encryption,
 +        // and we want to avoid copying from off-heap (compression buffer) to 
on-heap encryption APIs
 +        return manager.getBufferPool().createBuffer(BufferType.ON_HEAP);
 +    }
 +
 +    void write(int startMarker, int nextMarker)
 +    {
 +        int contentStart = startMarker + SYNC_MARKER_SIZE;
 +        final int length = nextMarker - contentStart;
 +        // The length may be 0 when the segment is being closed.
 +        assert length > 0 || length == 0 && !isStillAllocating();
 +
 +        final ICompressor compressor = encryptionContext.getCompressor();
 +        final int blockSize = encryptionContext.getChunkLength();
 +        try
 +        {
 +            ByteBuffer inputBuffer = buffer.duplicate();
 +            inputBuffer.limit(contentStart + length).position(contentStart);
 +            ByteBuffer buffer = 
manager.getBufferPool().getThreadLocalReusableBuffer(DatabaseDescriptor.getCommitLogSegmentSize());
 +
 +            // save space for the sync marker at the beginning of this section
 +            final long syncMarkerPosition = lastWrittenPos;
 +            channel.position(syncMarkerPosition + 
ENCRYPTED_SECTION_HEADER_SIZE);
 +
 +            // loop over the segment data in encryption buffer sized chunks
 +            while (contentStart < nextMarker)
 +            {
 +                int nextBlockSize = nextMarker - blockSize > contentStart ? 
blockSize : nextMarker - contentStart;
 +                ByteBuffer slice = inputBuffer.duplicate();
 +                slice.limit(contentStart + 
nextBlockSize).position(contentStart);
 +
 +                buffer = EncryptionUtils.compress(slice, buffer, true, 
compressor);
 +
 +                // reuse the same buffer for the input and output of the 
encryption operation
 +                buffer = EncryptionUtils.encryptAndWrite(buffer, channel, 
true, cipher);
 +
 +                contentStart += nextBlockSize;
 +                manager.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
 +            }
 +
 +            lastWrittenPos = channel.position();
 +
 +            // rewind to the beginning of the section and write out the sync 
marker
 +            buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
-             writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) 
lastWrittenPos);
++            writeSyncMarker(id, buffer, 0, (int) syncMarkerPosition, (int) 
lastWrittenPos);
 +            buffer.putInt(SYNC_MARKER_SIZE, length);
 +            buffer.rewind();
 +            manager.addSize(buffer.limit());
 +
 +            channel.position(syncMarkerPosition);
 +            channel.write(buffer);
 +
 +            SyncUtil.force(channel, true);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new FSWriteError(e, getPath());
 +        }
 +    }
 +
 +    public long onDiskSize()
 +    {
 +        return lastWrittenPos;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 250b3e4,3a16d91..bbf9ad2
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@@ -76,10 -77,9 +76,10 @@@ public class MemoryMappedSegment extend
  
          // write previous sync marker to point to next sync marker
          // we don't chain the crcs here to ensure this method is idempotent 
if it fails
-         writeSyncMarker(buffer, startMarker, startMarker, nextMarker);
+         writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker);
  
 -        try {
 +        try
 +        {
              SyncUtil.force((MappedByteBuffer) buffer);
          }
          catch (Exception e) // MappedByteBuffer.force() does not declare 
IOException but can actually throw it

http://git-wip-us.apache.org/repos/asf/cassandra/blob/be211749/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index aab55a5,9e9ee53..267813e
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -28,9 -30,11 +28,10 @@@ import java.util.stream.Collectors
  import java.util.zip.CRC32;
  import java.util.zip.Checksum;
  
 +import com.google.common.collect.Iterables;
 +
 +import org.junit.*;
+ import com.google.common.io.Files;
 -import org.junit.Assert;
 -import org.junit.Before;
 -import org.junit.BeforeClass;
 -import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.junit.runners.Parameterized;
  import org.junit.runners.Parameterized.Parameters;
@@@ -155,12 -129,64 +156,64 @@@ public class CommitLogTes
      }
  
      @Test
 -    public void testRecoveryWithEmptyFinalLog() throws Exception
 +    public void testRecoveryWithFinalEmptyLog() throws Exception
      {
          // Even though it's empty, it's the last commitlog segment, so 
allowTruncation=true should allow it to pass
 -        CommitLog.instance.recover(new File[]{ 
tmpFile(CommitLogDescriptor.current_version)  });
 +        CommitLog.instance.recoverFiles(new 
File[]{tmpFile(CommitLogDescriptor.current_version)});
      }
  
+     /**
+      * Since commit log segments can be allocated before they're needed, the 
commit log file with the highest
+      * id isn't neccesarily the last log that we wrote to. We should remove 
header only logs on recover so we
+      * can tolerate truncated logs
+      */
+     @Test
+     public void testHeaderOnlyFileFiltering() throws Exception
+     {
+         File directory = Files.createTempDir();
+ 
 -        CommitLogDescriptor desc1 = new 
CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null);
 -        CommitLogDescriptor desc2 = new 
CommitLogDescriptor(CommitLogDescriptor.current_version, 2, null);
++        CommitLogDescriptor desc1 = new 
CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, 
DatabaseDescriptor.getEncryptionContext());
++        CommitLogDescriptor desc2 = new 
CommitLogDescriptor(CommitLogDescriptor.current_version, 2, null, 
DatabaseDescriptor.getEncryptionContext());
+ 
+         ByteBuffer buffer;
+ 
+         // this has a header and malformed data
+         File file1 = new File(directory, desc1.fileName());
+         buffer = ByteBuffer.allocate(1024);
+         CommitLogDescriptor.writeHeader(buffer, desc1);
+         int pos = buffer.position();
+         CommitLogSegment.writeSyncMarker(desc1.id, buffer, buffer.position(), 
buffer.position(), buffer.position() + 128);
+         buffer.position(pos + 8);
+         buffer.putInt(5);
+         buffer.putInt(6);
+ 
+         try (OutputStream lout = new FileOutputStream(file1))
+         {
+             lout.write(buffer.array());
+         }
+ 
+         // this has only a header
+         File file2 = new File(directory, desc2.fileName());
+         buffer = ByteBuffer.allocate(1024);
+         CommitLogDescriptor.writeHeader(buffer, desc2);
+         try (OutputStream lout = new FileOutputStream(file2))
+         {
+             lout.write(buffer.array());
+         }
+ 
+         // one corrupt file and one header only file should be ok
+         runExpecting(() -> {
 -            CommitLog.instance.recover(file1, file2);
++            CommitLog.instance.recoverFiles(file1, file2);
+             return null;
+         }, null);
+ 
+         // 2 corrupt files and one header only file should fail
+         runExpecting(() -> {
 -            CommitLog.instance.recover(file1, file1, file2);
++            CommitLog.instance.recoverFiles(file1, file1, file2);
+             return null;
+         }, CommitLogReplayException.class);
+     }
+ 
      @Test
      public void testRecoveryWithEmptyLog20() throws Exception
      {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to