Merge branch 'cassandra-3.0' into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2399d4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2399d4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2399d4d

Branch: refs/heads/cassandra-3.11
Commit: a2399d4d309ac6b60a150ea20af8dc6f006d51ff
Parents: 2c111d1 44f79bf
Author: Jeff Jirsa <j...@jeffjirsa.net>
Authored: Sun Mar 12 21:56:11 2017 -0700
Committer: Jeff Jirsa <j...@jeffjirsa.net>
Committed: Sun Mar 12 21:57:25 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                          |  1 +
 .../cassandra/db/commitlog/CommitLogReader.java      | 12 ++++++++++++
 .../apache/cassandra/db/commitlog/CommitLogTest.java | 15 ++++++++++++++-
 3 files changed, 27 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 302a028,140c860..ab28dd4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -33,140 -43,6 +33,141 @@@ Merged from 3.0
     live rows in sstabledump (CASSANDRA-13177)
   * Provide user workaround when system_schema.columns does not contain entries
     for a table that's in system_schema.tables (CASSANDRA-13180)
 +Merged from 2.2:
++ * Commitlog replay may fail if last mutation is within 4 bytes of end of 
segment (CASSANDRA-13282)
 + * Fix queries updating multiple time the same list (CASSANDRA-13130)
 + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
 + * Fix failing COPY TO STDOUT (CASSANDRA-12497)
 + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
 + * Exceptions encountered calling getSeeds() breaks OTC thread 
(CASSANDRA-13018)
 + * Fix negative mean latency metric (CASSANDRA-12876)
 + * Use only one file pointer when creating commitlog segments 
(CASSANDRA-12539)
 +Merged from 2.1:
 + * Remove unused repositories (CASSANDRA-13278)
 + * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
 + * Use portable stderr for java error in startup (CASSANDRA-13211)
 + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
 + * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
 +
 +
 +3.10
 + * Fix secondary index queries regression (CASSANDRA-13013)
 + * Add duration type to the protocol V5 (CASSANDRA-12850)
 + * Fix duration type validation (CASSANDRA-13143)
 + * Fix flaky GcCompactionTest (CASSANDRA-12664)
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed 
(CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy 
(CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier 
customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and 
introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL 
(CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are 
overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata 
(CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster 
(CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable 
(CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes 
in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation 
(CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types 
(CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator 
(CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary 
indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages 
(CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 
12550)
 + * Fix clustering indexes in presence of static columns in SASI 
(CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished 
running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for 
memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts 
(CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
   * Dump threads when unit tests time out (CASSANDRA-13117)
   * Better error when modifying function permissions without explicit keyspace 
(CASSANDRA-12925)
   * Indexer is not correctly invoked when building indexes over sstables 
(CASSANDRA-13075)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index e6e2e1a,0000000..d1cb8d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -1,502 -1,0 +1,514 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.zip.CRC32;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.UnknownColumnFamilyException;
 +import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
 +import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +
 +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 +
 +public class CommitLogReader
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(CommitLogReader.class);
 +
 +    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 +
 +    @VisibleForTesting
 +    public static final int ALL_MUTATIONS = -1;
 +    private final CRC32 checksum;
 +    private final Map<UUID, AtomicInteger> invalidMutations;
 +
 +    private byte[] buffer;
 +
 +    public CommitLogReader()
 +    {
 +        checksum = new CRC32();
 +        invalidMutations = new HashMap<>();
 +        buffer = new byte[4096];
 +    }
 +
 +    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
 +    {
 +        return invalidMutations.entrySet();
 +    }
 +
 +    /**
 +     * Reads all passed in files with no minimum, no start, and no mutation 
limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files) 
throws IOException
 +    {
 +        readAllFiles(handler, files, CommitLogPosition.NONE);
 +    }
 +
 +    /**
 +     * Reads all passed in files with minPosition, no start, and no mutation 
limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files, 
CommitLogPosition minPosition) throws IOException
 +    {
 +        for (int i = 0; i < files.length; i++)
 +            readCommitLogSegment(handler, files[i], minPosition, 
ALL_MUTATIONS, i + 1 == files.length);
 +    }
 +
 +    /**
 +     * Reads passed in file fully
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, 
boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, 
ALL_MUTATIONS, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads passed in file fully, up to mutationLimit count
 +     */
 +    @VisibleForTesting
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, 
int mutationLimit, boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, 
mutationLimit, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads mutations from file, handing them off to handler
 +     * @param handler Handler that will take action based on deserialized 
Mutations
 +     * @param file CommitLogSegment file to read
 +     * @param minPosition Optional minimum CommitLogPosition - all segments 
with id > or matching w/greater position will be read
 +     * @param mutationLimit Optional limit on # of mutations to replay. Local 
ALL_MUTATIONS serves as marker to play all.
 +     * @param tolerateTruncation Whether or not we should allow truncation of 
this file or throw if EOF found
 +     *
 +     * @throws IOException
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler,
 +                                     File file,
 +                                     CommitLogPosition minPosition,
 +                                     int mutationLimit,
 +                                     boolean tolerateTruncation) throws 
IOException
 +    {
 +        // just transform from the file name (no reading of headers) to 
determine version
 +        CommitLogDescriptor desc = 
CommitLogDescriptor.fromFileName(file.getName());
 +
 +        try(RandomAccessReader reader = RandomAccessReader.open(file))
 +        {
 +            if (desc.version < CommitLogDescriptor.VERSION_21)
 +            {
 +                if (!shouldSkipSegmentId(file, desc, minPosition))
 +                {
 +                    if (minPosition.segmentId == desc.id)
 +                        reader.seek(minPosition.position);
 +                    ReadStatusTracker statusTracker = new 
ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                    statusTracker.errorContext = desc.fileName();
 +                    readSection(handler, reader, minPosition, (int) 
reader.length(), statusTracker, desc);
 +                }
 +                return;
 +            }
 +
 +            final long segmentIdFromFilename = desc.id;
 +            try
 +            {
 +                // The following call can either throw or legitimately return 
null. For either case, we need to check
 +                // desc outside this block and set it to null in the 
exception case.
 +                desc = CommitLogDescriptor.readHeader(reader, 
DatabaseDescriptor.getEncryptionContext());
 +            }
 +            catch (Exception e)
 +            {
 +                desc = null;
 +            }
 +            if (desc == null)
 +            {
 +                // don't care about whether or not the handler thinks we can 
continue. We can't w/out descriptor.
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Could not read commit log descriptor in 
file %s", file),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
 +                    false));
 +                return;
 +            }
 +
 +            if (segmentIdFromFilename != desc.id)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(String.format(
 +                    "Segment id mismatch (filename %d, descriptor %d) in file 
%s", segmentIdFromFilename, desc.id, file),
 +                                                                              
  CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
 +                                                                              
  false)))
 +                {
 +                    return;
 +                }
 +            }
 +
 +            if (shouldSkipSegmentId(file, desc, minPosition))
 +                return;
 +
 +            CommitLogSegmentReader segmentReader;
 +            try
 +            {
 +                segmentReader = new CommitLogSegmentReader(handler, desc, 
reader, tolerateTruncation);
 +            }
 +            catch(Exception e)
 +            {
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Unable to create segment reader for commit 
log file: %s", e),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
 +                    tolerateTruncation));
 +                return;
 +            }
 +
 +            try
 +            {
 +                ReadStatusTracker statusTracker = new 
ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                for (CommitLogSegmentReader.SyncSegment syncSegment : 
segmentReader)
 +                {
 +                    // Only tolerate truncation if we allow in both global 
and segment
 +                    statusTracker.tolerateErrorsInSection = 
tolerateTruncation & syncSegment.toleratesErrorsInSection;
 +
 +                    // Skip segments that are completely behind the desired 
minPosition
 +                    if (desc.id == minPosition.segmentId && 
syncSegment.endPosition < minPosition.position)
 +                        continue;
 +
 +                    statusTracker.errorContext = String.format("Next section 
at %d in %s", syncSegment.fileStartPosition, desc.fileName());
 +
 +                    readSection(handler, syncSegment.input, minPosition, 
syncSegment.endPosition, statusTracker, desc);
 +                    if (!statusTracker.shouldContinue())
 +                        break;
 +                }
 +            }
 +            // Unfortunately AbstractIterator cannot throw a checked 
exception, so we check to see if a RuntimeException
 +            // is wrapping an IOException.
 +            catch (RuntimeException re)
 +            {
 +                if (re.getCause() instanceof IOException)
 +                    throw (IOException) re.getCause();
 +                throw re;
 +            }
 +            logger.debug("Finished reading {}", file);
 +        }
 +    }
 +
 +    /**
 +     * Any segment with id >= minPosition.segmentId is a candidate for read.
 +     */
 +    private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, 
CommitLogPosition minPosition)
 +    {
 +        logger.debug("Reading {} (CL version {}, messaging version {}, 
compression {})",
 +            file.getPath(),
 +            desc.version,
 +            desc.getMessagingVersion(),
 +            desc.compression);
 +
 +        if (minPosition.segmentId > desc.id)
 +        {
 +            logger.trace("Skipping read of fully-flushed {}", file);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Reads a section of a file containing mutations
 +     *
 +     * @param handler Handler that will take action based on deserialized 
Mutations
 +     * @param reader FileDataInput / logical buffer containing commitlog 
mutations
 +     * @param minPosition CommitLogPosition indicating when we should start 
actively replaying mutations
 +     * @param end logical numeric end of the segment being read
 +     * @param statusTracker ReadStatusTracker with current state of mutation 
count, error state, etc
 +     * @param desc Descriptor for CommitLog serialization
 +     */
 +    private void readSection(CommitLogReadHandler handler,
 +                             FileDataInput reader,
 +                             CommitLogPosition minPosition,
 +                             int end,
 +                             ReadStatusTracker statusTracker,
 +                             CommitLogDescriptor desc) throws IOException
 +    {
 +        // seek rather than deserializing mutation-by-mutation to reach the 
desired minPosition in this SyncSegment
 +        if (desc.id == minPosition.segmentId && reader.getFilePointer() < 
minPosition.position)
 +            reader.seek(minPosition.position);
 +
 +        while (statusTracker.shouldContinue() && reader.getFilePointer() < 
end && !reader.isEOF())
 +        {
 +            long mutationStart = reader.getFilePointer();
 +            if (logger.isTraceEnabled())
 +                logger.trace("Reading mutation at {}", mutationStart);
 +
 +            long claimedCRC32;
 +            int serializedSize;
 +            try
 +            {
++                // We rely on reading serialized size == 0 
(LEGACY_END_OF_SEGMENT_MARKER) to identify the end
++                // of a segment, which happens naturally due to the 0 padding 
of the empty segment on creation.
++                // However, it's possible with 2.1 era commitlogs that the 
last mutation ended less than 4 bytes
++                // from the end of the file, which means that we'll be unable 
to read an a full int and instead
++                // read an EOF here
++                if(end - reader.getFilePointer() < 4)
++                {
++                    logger.trace("Not enough bytes left for another mutation 
in this CommitLog segment, continuing");
++                    statusTracker.requestTermination();
++                    return;
++                }
++
 +                // any of the reads may hit EOF
 +                serializedSize = reader.readInt();
 +                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 +                {
 +                    logger.trace("Encountered end of segment marker at {}", 
reader.getFilePointer());
 +                    statusTracker.requestTermination();
 +                    return;
 +                }
 +
 +                // Mutation must be at LEAST 10 bytes:
 +                //    3 for a non-empty Keyspace
 +                //    3 for a Key (including the 2-byte length from 
writeUTF/writeWithShortLength)
 +                //    4 bytes for column count.
 +                // This prevents CRC by being fooled by special-case garbage 
in the file; see CASSANDRA-2128
 +                if (serializedSize < 10)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                    String.format("Invalid 
mutation size %d at %d in %s", serializedSize, mutationStart, 
statusTracker.errorContext),
 +                                                    
CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    
statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                long claimedSizeChecksum = 
CommitLogFormat.calculateClaimedChecksum(reader, desc.version);
 +                checksum.reset();
 +                CommitLogFormat.updateChecksum(checksum, serializedSize, 
desc.version);
 +
 +                if (checksum.getValue() != claimedSizeChecksum)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                    String.format("Mutation 
size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                    
CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    
statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                if (serializedSize > buffer.length)
 +                    buffer = new byte[(int) (1.2 * serializedSize)];
 +                reader.readFully(buffer, 0, serializedSize);
 +
 +                claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, 
desc.version);
 +            }
 +            catch (EOFException eof)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                String.format("Unexpected end 
of segment at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.EOF,
 +                                                
statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                return;
 +            }
 +
 +            checksum.update(buffer, 0, serializedSize);
 +            if (claimedCRC32 != checksum.getValue())
 +            {
 +                if (handler.shouldSkipSegmentOnError(new 
CommitLogReadException(
 +                                                String.format("Mutation 
checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                
CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                
statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                continue;
 +            }
 +
 +            long mutationPosition = reader.getFilePointer();
 +            readMutation(handler, buffer, serializedSize, minPosition, 
(int)mutationPosition, desc);
 +
 +            // Only count this as a processed mutation if it is after our min 
as we suppress reading of mutations that
 +            // are before this mark.
 +            if (mutationPosition >= minPosition.position)
 +                statusTracker.addProcessedMutation();
 +        }
 +    }
 +
 +    /**
 +     * Deserializes and passes a Mutation to the ICommitLogReadHandler 
requested
 +     *
 +     * @param handler Handler that will take action based on deserialized 
Mutations
 +     * @param inputBuffer raw byte array w/Mutation data
 +     * @param size deserialized size of mutation
 +     * @param minPosition We need to suppress replay of mutations that are 
before the required minPosition
 +     * @param entryLocation filePointer offset of mutation within 
CommitLogSegment
 +     * @param desc CommitLogDescriptor being worked on
 +     */
 +    @VisibleForTesting
 +    protected void readMutation(CommitLogReadHandler handler,
 +                                byte[] inputBuffer,
 +                                int size,
 +                                CommitLogPosition minPosition,
 +                                final int entryLocation,
 +                                final CommitLogDescriptor desc) throws 
IOException
 +    {
 +        // For now, we need to go through the motions of deserializing the 
mutation to determine its size and move
 +        // the file pointer forward accordingly, even if we're behind the 
requested minPosition within this SyncSegment.
 +        boolean shouldReplay = entryLocation > minPosition.position;
 +
 +        final Mutation mutation;
 +        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 
0, size))
 +        {
 +            mutation = Mutation.serializer.deserialize(bufIn,
 +                                                       
desc.getMessagingVersion(),
 +                                                       
SerializationHelper.Flag.LOCAL);
 +            // doublecheck that what we read is still] valid for the current 
schema
 +            for (PartitionUpdate upd : mutation.getPartitionUpdates())
 +                upd.validate();
 +        }
 +        catch (UnknownColumnFamilyException ex)
 +        {
 +            if (ex.cfId == null)
 +                return;
 +            AtomicInteger i = invalidMutations.get(ex.cfId);
 +            if (i == null)
 +            {
 +                i = new AtomicInteger(1);
 +                invalidMutations.put(ex.cfId, i);
 +            }
 +            else
 +                i.incrementAndGet();
 +            return;
 +        }
 +        catch (Throwable t)
 +        {
 +            JVMStabilityInspector.inspectThrowable(t);
 +            File f = File.createTempFile("mutation", "dat");
 +
 +            try (DataOutputStream out = new DataOutputStream(new 
FileOutputStream(f)))
 +            {
 +                out.write(inputBuffer, 0, size);
 +            }
 +
 +            // Checksum passed so this error can't be permissible.
 +            handler.handleUnrecoverableError(new CommitLogReadException(
 +                String.format(
 +                    "Unexpected error deserializing mutation; saved to %s.  " 
+
 +                    "This may be caused by replaying a mutation against a 
table with the same name but incompatible schema.  " +
 +                    "Exception follows: %s", f.getAbsolutePath(), t),
 +                CommitLogReadErrorReason.MUTATION_ERROR,
 +                false));
 +            return;
 +        }
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Read mutation for {}.{}: {}", 
mutation.getKeyspaceName(), mutation.key(),
 +                         "{" + 
StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}");
 +
 +        if (shouldReplay)
 +            handler.handleMutation(mutation, size, entryLocation, desc);
 +    }
 +
 +    /**
 +     * Helper methods to deal with changing formats of internals of the 
CommitLog without polluting deserialization code.
 +     */
 +    private static class CommitLogFormat
 +    {
 +        public static long calculateClaimedChecksum(FileDataInput input, int 
commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +
 +        public static void updateChecksum(CRC32 checksum, int serializedSize, 
int commitLogVersion)
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                    checksum.update(serializedSize);
 +                    break;
 +                // Changed format in 2.0
 +                default:
 +                    updateChecksumInt(checksum, serializedSize);
 +                    break;
 +            }
 +        }
 +
 +        public static long calculateClaimedCRC32(FileDataInput input, int 
commitLogVersion) throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +    }
 +
 +    private static class ReadStatusTracker
 +    {
 +        private int mutationsLeft;
 +        public String errorContext = "";
 +        public boolean tolerateErrorsInSection;
 +        private boolean error;
 +
 +        public ReadStatusTracker(int mutationLimit, boolean 
tolerateErrorsInSection)
 +        {
 +            this.mutationsLeft = mutationLimit;
 +            this.tolerateErrorsInSection = tolerateErrorsInSection;
 +        }
 +
 +        public void addProcessedMutation()
 +        {
 +            if (mutationsLeft == ALL_MUTATIONS)
 +                return;
 +            --mutationsLeft;
 +        }
 +
 +        public boolean shouldContinue()
 +        {
 +            return !error && (mutationsLeft != 0 || mutationsLeft == 
ALL_MUTATIONS);
 +        }
 +
 +        public void requestTermination()
 +        {
 +            error = true;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2399d4d/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 5476d03,90dc258..4000fbf
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -171,10 -143,23 +171,23 @@@ public class CommitLogTes
      }
  
      @Test
+     public void testRecoveryWithShortPadding() throws Exception
+     {
 -        // If we have 0-3 bytes remaining, commitlog replayer
 -        // should pass, because there's insufficient room
 -        // left in the segment for the legacy size marker.
 -        testRecovery(new byte[1], null);
 -        testRecovery(new byte[2], null);
 -        testRecovery(new byte[3], null);
++            // If we have 0-3 bytes remaining, commitlog replayer
++            // should pass, because there's insufficient room
++            // left in the segment for the legacy size marker.
++            testRecovery(new byte[1], null);
++            testRecovery(new byte[2], null);
++            testRecovery(new byte[3], null);
+     }
+ 
+     @Test
      public void testRecoveryWithShortSize() throws Exception
      {
+         byte[] data = new byte[5];
+         data[3] = 1; // Not a legacy marker, give it a fake (short) size
          runExpecting(() -> {
-             testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+             testRecovery(data, CommitLogDescriptor.VERSION_20);
              return null;
          }, CommitLogReplayException.class);
      }

Reply via email to