Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java Mon May 9 07:05:55 2011 @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.commitlog; - -import java.io.*; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import org.apache.cassandra.io.ICompactSerializer2; -import org.apache.cassandra.io.util.FileUtils; - -public class CommitLogHeader -{ - public static String getHeaderPathFromSegment(CommitLogSegment segment) - { - return getHeaderPathFromSegmentPath(segment.getPath()); - } - - public static String getHeaderPathFromSegmentPath(String segmentPath) - { - return segmentPath + ".header"; - } - - public static CommitLogHeaderSerializer serializer = new CommitLogHeaderSerializer(); - - private Map<Integer, Integer> cfDirtiedAt; // position at which each CF was last flushed - - CommitLogHeader() - { - this(new HashMap<Integer, Integer>()); - } - - /* - * This ctor is used while deserializing. This ctor - * also builds an index of position to column family - * Id. - */ - private CommitLogHeader(Map<Integer, Integer> cfDirtiedAt) - { - this.cfDirtiedAt = cfDirtiedAt; - } - - boolean isDirty(Integer cfId) - { - return cfDirtiedAt.containsKey(cfId); - } - - int getPosition(Integer cfId) - { - Integer x = cfDirtiedAt.get(cfId); - return x == null ? 0 : x; - } - - void turnOn(Integer cfId, long position) - { - assert position >= 0 && position <= Integer.MAX_VALUE; - cfDirtiedAt.put(cfId, (int)position); - } - - void turnOff(Integer cfId) - { - cfDirtiedAt.remove(cfId); - } - - boolean isSafeToDelete() throws IOException - { - return cfDirtiedAt.isEmpty(); - } - - // we use cf ids. getting the cf names would be pretty pretty expensive. - public String toString() - { - StringBuilder sb = new StringBuilder(""); - sb.append("CLH(dirty+flushed={"); - for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet()) - { - sb.append(entry.getKey()).append(": ").append(entry.getValue()).append(", "); - } - sb.append("})"); - return sb.toString(); - } - - public String dirtyString() - { - StringBuilder sb = new StringBuilder(); - for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet()) - sb.append(entry.getKey()).append(", "); - return sb.toString(); - } - - static void writeCommitLogHeader(CommitLogHeader header, String headerFile) throws IOException - { - DataOutputStream out = null; - try - { - /* - * FileOutputStream doesn't sync on flush/close. - * As headers are "optional" now there is no reason to sync it. - * This provides nearly double the performance of BRAF, more under heavey load. - */ - out = new DataOutputStream(new FileOutputStream(headerFile)); - serializer.serialize(header, out); - } - finally - { - if (out != null) - out.close(); - } - } - - static CommitLogHeader readCommitLogHeader(String headerFile) throws IOException - { - DataInputStream reader = null; - try - { - reader = new DataInputStream(new BufferedInputStream(new FileInputStream(headerFile))); - return serializer.deserialize(reader); - } - finally - { - FileUtils.closeQuietly(reader); - } - } - - int getReplayPosition() - { - return cfDirtiedAt.isEmpty() ? -1 : Collections.min(cfDirtiedAt.values()); - } - - static class CommitLogHeaderSerializer implements ICompactSerializer2<CommitLogHeader> - { - public void serialize(CommitLogHeader clHeader, DataOutput dos) throws IOException - { - Checksum checksum = new CRC32(); - - // write the first checksum after the fixed-size part, so we won't read garbage lastFlushedAt data. - dos.writeInt(clHeader.cfDirtiedAt.size()); // 4 - checksum.update(clHeader.cfDirtiedAt.size()); - dos.writeLong(checksum.getValue()); - - // write the 2nd checksum after the lastflushedat map - for (Map.Entry<Integer, Integer> entry : clHeader.cfDirtiedAt.entrySet()) - { - dos.writeInt(entry.getKey()); // 4 - checksum.update(entry.getKey()); - dos.writeInt(entry.getValue()); // 4 - checksum.update(entry.getValue()); - } - dos.writeLong(checksum.getValue()); - } - - public CommitLogHeader deserialize(DataInput dis) throws IOException - { - Checksum checksum = new CRC32(); - - int lastFlushedAtSize = dis.readInt(); - checksum.update(lastFlushedAtSize); - if (checksum.getValue() != dis.readLong()) - { - throw new IOException("Invalid or corrupt commitlog header"); - } - Map<Integer, Integer> lastFlushedAt = new HashMap<Integer, Integer>(); - for (int i = 0; i < lastFlushedAtSize; i++) - { - int key = dis.readInt(); - checksum.update(key); - int value = dis.readInt(); - checksum.update(value); - lastFlushedAt.put(key, value); - } - if (checksum.getValue() != dis.readLong()) - { - throw new IOException("Invalid or corrupt commitlog header"); - } - - return new CommitLogHeader(lastFlushedAt); - } - } -}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java Mon May 9 07:05:55 2011 @@ -23,6 +23,10 @@ package org.apache.cassandra.db.commitlo import java.io.File; import java.io.IOError; import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -30,30 +34,30 @@ import org.apache.cassandra.net.Messagin import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.io.util.BufferedRandomAccessFile; public class CommitLogSegment { private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); + private static Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile("CommitLog-(\\d+).log"); + public final long id; private final BufferedRandomAccessFile logWriter; - private final CommitLogHeader header; + + // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we could delete this segment + private Set<Integer> cfDirty = new HashSet<Integer>(); public CommitLogSegment() { - this.header = new CommitLogHeader(); - String logFile = DatabaseDescriptor.getCommitLogLocation() + File.separator + "CommitLog-" + System.currentTimeMillis() + ".log"; + id = System.currentTimeMillis(); + String logFile = DatabaseDescriptor.getCommitLogLocation() + File.separator + "CommitLog-" + id + ".log"; logger.info("Creating new commitlog segment " + logFile); try { logWriter = createWriter(logFile); - - writeHeader(); } catch (IOException e) { @@ -61,14 +65,26 @@ public class CommitLogSegment } } - public static boolean possibleCommitLogFile(String filename) + // assume filename is a 'possibleCommitLogFile()' + public static long idFromFilename(String filename) { - return filename.matches("CommitLog-\\d+.log"); + Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename); + try + { + if (matcher.matches()) + return Long.valueOf(matcher.group(1)); + else + return -1L; + } + catch (NumberFormatException e) + { + return -1L; + } } - public void writeHeader() throws IOException + public static boolean possibleCommitLogFile(String filename) { - CommitLogHeader.writeCommitLogHeader(header, getHeaderPath()); + return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches(); } private static BufferedRandomAccessFile createWriter(String file) throws IOException @@ -76,35 +92,14 @@ public class CommitLogSegment return new BufferedRandomAccessFile(new File(file), "rw", 128 * 1024, true); } - public CommitLogSegment.CommitLogContext write(RowMutation rowMutation) throws IOException + public ReplayPosition write(RowMutation rowMutation) throws IOException { long currentPosition = -1L; try { currentPosition = logWriter.getFilePointer(); - CommitLogSegment.CommitLogContext cLogCtx = new CommitLogSegment.CommitLogContext(currentPosition); - - // update header - for (ColumnFamily columnFamily : rowMutation.getColumnFamilies()) - { - // we can ignore the serialized map in the header (and avoid deserializing it) since we know we are - // writing the cfs as they exist now. check for null cfm in case a cl write goes through after the cf is - // defined but before a new segment is created. - CFMetaData cfm = DatabaseDescriptor.getCFMetaData(columnFamily.id()); - if (cfm == null) - { - logger.error("Attempted to write commit log entry for unrecognized column family: " + columnFamily.id()); - } - else - { - Integer id = cfm.cfId; - if (!header.isDirty(id)) - { - header.turnOn(id, logWriter.getFilePointer()); - writeHeader(); - } - } - } + assert currentPosition <= Integer.MAX_VALUE; + ReplayPosition cLogCtx = new ReplayPosition(id, (int) currentPosition); // write mutation, w/ checksum on the size and data Checksum checksum = new CRC32(); @@ -131,14 +126,11 @@ public class CommitLogSegment logWriter.sync(); } - public CommitLogContext getContext() - { - return new CommitLogContext(logWriter.getFilePointer()); - } - - public CommitLogHeader getHeader() + public ReplayPosition getContext() { - return header; + long position = logWriter.getFilePointer(); + assert position <= Integer.MAX_VALUE; + return new ReplayPosition(id, (int) position); } public String getPath() @@ -146,9 +138,9 @@ public class CommitLogSegment return logWriter.getPath(); } - public String getHeaderPath() + public String getName() { - return CommitLogHeader.getHeaderPathFromSegment(this); + return logWriter.getPath().substring(logWriter.getPath().lastIndexOf(File.separator) + 1); } public long length() @@ -175,34 +167,34 @@ public class CommitLogSegment } } - @Override - public String toString() + void turnOn(Integer cfId) { - return "CommitLogSegment(" + logWriter.getPath() + ')'; + cfDirty.add(cfId); } - public class CommitLogContext + void turnOff(Integer cfId) { - public final long position; + cfDirty.remove(cfId); + } - public CommitLogContext(long position) - { - assert position >= 0; - this.position = position; - } + // For debugging, not fast + String dirtyString() + { + StringBuilder sb = new StringBuilder(); + for (Integer cfId : cfDirty) + sb.append(DatabaseDescriptor.getCFMetaData(cfId).cfName).append(" (").append(cfId).append("), "); + return sb.toString(); + } - public CommitLogSegment getSegment() - { - return CommitLogSegment.this; - } + boolean isSafeToDelete() + { + return cfDirty.isEmpty(); + } - @Override - public String toString() - { - return "CommitLogContext(" + - "file='" + logWriter.getPath() + '\'' + - ", position=" + position + - ')'; - } + @Override + public String toString() + { + return "CommitLogSegment(" + logWriter.getPath() + ')'; } + } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java Mon May 9 07:05:55 2011 @@ -21,6 +21,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; +import java.sql.Types; import org.apache.cassandra.db.Column; import org.apache.cassandra.db.context.CounterContext; @@ -52,4 +53,39 @@ public abstract class AbstractCommutativ { return Long.class; } + + public boolean isSigned() + { + return true; + } + + public boolean isCaseSensitive() + { + return false; + } + + public boolean isCurrency() + { + return false; + } + + public int getPrecision(Long obj) + { + return obj.toString().length(); + } + + public int getScale(Long obj) + { + return 0; + } + + public int getJdbcType() + { + return Types.INTEGER; + } + + public boolean needsQuotes() + { + return false; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java Mon May 9 07:05:55 2011 @@ -147,4 +147,16 @@ public abstract class AbstractType<T> im /** returns the class this AbstractType represents. */ public abstract Class<T> getType(); + + // + // JDBC metadata + // + + public abstract boolean isSigned(); + public abstract boolean isCaseSensitive(); + public abstract boolean isCurrency(); + public abstract int getPrecision(T obj); + public abstract int getScale(T obj); + public abstract int getJdbcType(); + public abstract boolean needsQuotes(); } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java Mon May 9 07:05:55 2011 @@ -23,6 +23,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; +import java.sql.Types; import com.google.common.base.Charsets; @@ -86,4 +87,39 @@ public class AsciiType extends AbstractT { return String.class; } + + public boolean isSigned() + { + return false; + } + + public boolean isCaseSensitive() + { + return true; + } + + public boolean isCurrency() + { + return false; + } + + public int getPrecision(String obj) + { + return -1; + } + + public int getScale(String obj) + { + return -1; + } + + public int getJdbcType() + { + return Types.VARCHAR; + } + + public boolean needsQuotes() + { + return true; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java Mon May 9 07:05:55 2011 @@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; +import java.sql.Types; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -88,4 +89,39 @@ public class BytesType extends AbstractT { return ByteBuffer.class; } + + public boolean isSigned() + { + return false; + } + + public boolean isCaseSensitive() + { + return false; + } + + public boolean isCurrency() + { + return false; + } + + public int getPrecision(ByteBuffer obj) + { + return -1; + } + + public int getScale(ByteBuffer obj) + { + return -1; + } + + public int getJdbcType() + { + return Types.BINARY; + } + + public boolean needsQuotes() + { + return true; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java Mon May 9 07:05:55 2011 @@ -21,6 +21,7 @@ package org.apache.cassandra.db.marshal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.sql.Types; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.thrift.TBaseHelper; @@ -169,4 +170,39 @@ public final class IntegerType extends A { return BigInteger.class; } + + public boolean isSigned() + { + return true; + } + + public boolean isCaseSensitive() + { + return false; + } + + public boolean isCurrency() + { + return false; + } + + public int getPrecision(BigInteger obj) + { + return obj.toString().length(); + } + + public int getScale(BigInteger obj) + { + return 0; + } + + public int getJdbcType() + { + return Types.BIGINT; + } + + public boolean needsQuotes() + { + return false; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java Mon May 9 07:05:55 2011 @@ -27,7 +27,7 @@ import java.util.UUID; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDGen; -public class LexicalUUIDType extends AbstractType<UUID> +public class LexicalUUIDType extends AbstractUUIDType { public static final LexicalUUIDType instance = new LexicalUUIDType(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java Mon May 9 07:05:55 2011 @@ -77,4 +77,39 @@ public class LocalByPartionerType<T exte { return ByteBuffer.class; } + + public boolean isSigned() + { + throw new UnsupportedOperationException(); + } + + public boolean isCaseSensitive() + { + throw new UnsupportedOperationException(); + } + + public boolean isCurrency() + { + throw new UnsupportedOperationException(); + } + + public int getPrecision(ByteBuffer obj) + { + throw new UnsupportedOperationException(); + } + + public int getScale(ByteBuffer obj) + { + throw new UnsupportedOperationException(); + } + + public int getJdbcType() + { + throw new UnsupportedOperationException(); + } + + public boolean needsQuotes() + { + throw new UnsupportedOperationException(); + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java Mon May 9 07:05:55 2011 @@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; +import java.sql.Types; import org.apache.cassandra.utils.ByteBufferUtil; @@ -109,4 +110,39 @@ public class LongType extends AbstractTy { return Long.class; } + + public boolean isSigned() + { + return true; + } + + public boolean isCaseSensitive() + { + return false; + } + + public boolean isCurrency() + { + return false; + } + + public int getPrecision(Long obj) + { + return obj.toString().length(); + } + + public int getScale(Long obj) + { + return 0; + } + + public int getJdbcType() + { + return Types.INTEGER; + } + + public boolean needsQuotes() + { + return false; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java Mon May 9 07:05:55 2011 @@ -31,7 +31,7 @@ import org.apache.cassandra.utils.FBUtil import org.apache.cassandra.utils.UUIDGen; import org.apache.commons.lang.time.DateUtils; -public class TimeUUIDType extends AbstractType<UUID> +public class TimeUUIDType extends AbstractUUIDType { public static final TimeUUIDType instance = new TimeUUIDType(); Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java Mon May 9 07:05:55 2011 @@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; +import java.sql.Types; import com.google.common.base.Charsets; import org.apache.cassandra.utils.ByteBufferUtil; @@ -196,4 +197,39 @@ public class UTF8Type extends AbstractTy { return String.class; } + + public boolean isSigned() + { + return false; + } + + public boolean isCaseSensitive() + { + return true; + } + + public boolean isCurrency() + { + return false; + } + + public int getPrecision(String obj) + { + return -1; + } + + public int getScale(String obj) + { + return -1; + } + + public int getJdbcType() + { + return Types.VARCHAR; + } + + public boolean needsQuotes() + { + return true; + } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java Mon May 9 07:05:55 2011 @@ -43,7 +43,7 @@ import org.apache.cassandra.utils.UUIDGe * @see "com.fasterxml.uuid.UUIDComparator" * */ -public class UUIDType extends AbstractType<UUID> +public class UUIDType extends AbstractUUIDType { public static final UUIDType instance = new UUIDType(); Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Mon May 9 07:05:55 2011 @@ -38,7 +38,7 @@ import org.apache.cassandra.utils.Pair; public class Descriptor { public static final String LEGACY_VERSION = "a"; - public static final String CURRENT_VERSION = "f"; + public static final String CURRENT_VERSION = "g"; public final File directory; public final String version; @@ -80,6 +80,11 @@ public class Descriptor isLatestVersion = version.compareTo(CURRENT_VERSION) == 0; } + public boolean hasReplayPosition() + { + return version.compareTo("g") >= 0; + } + public String filenameFor(Component component) { return filenameFor(component.name()); Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Mon May 9 07:05:55 2011 @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.BufferedRandomAccessFile; import org.apache.cassandra.io.util.FileUtils; @@ -67,42 +68,53 @@ public abstract class SSTable public final CFMetaData metadata; public final IPartitioner partitioner; + public final ReplayPosition replayPosition; + protected final EstimatedHistogram estimatedRowSize; protected final EstimatedHistogram estimatedColumnCount; - protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner) + protected SSTable(Descriptor descriptor, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner) { - this(descriptor, new HashSet<Component>(), metadata, partitioner); + this(descriptor, new HashSet<Component>(), metadata, replayPosition, partitioner); } - protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) - { - this(descriptor, components, metadata, partitioner, defaultRowHistogram(), defaultColumnHistogram()); - } - - static EstimatedHistogram defaultColumnHistogram() + protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner) { - return new EstimatedHistogram(114); + this(descriptor, components, metadata, replayPosition, partitioner, defaultRowHistogram(), defaultColumnHistogram()); } - static EstimatedHistogram defaultRowHistogram() + protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, EstimatedHistogram rowSizes, EstimatedHistogram columnCounts) { - return new EstimatedHistogram(150); - } + assert descriptor != null; + assert components != null; + assert metadata != null; + assert replayPosition != null; + assert partitioner != null; + assert rowSizes != null; + assert columnCounts != null; - protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, EstimatedHistogram rowSizes, EstimatedHistogram columnCounts) - { this.descriptor = descriptor; Set<Component> dataComponents = new HashSet<Component>(components); for (Component component : components) assert component.type != Component.Type.COMPACTED_MARKER; this.components = Collections.unmodifiableSet(dataComponents); this.metadata = metadata; + this.replayPosition = replayPosition; this.partitioner = partitioner; estimatedRowSize = rowSizes; estimatedColumnCount = columnCounts; } + static EstimatedHistogram defaultColumnHistogram() + { + return new EstimatedHistogram(114); + } + + static EstimatedHistogram defaultRowHistogram() + { + return new EstimatedHistogram(150); + } + public EstimatedHistogram getEstimatedRowSize() { return estimatedRowSize; Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Mon May 9 07:05:55 2011 @@ -34,6 +34,7 @@ import org.apache.cassandra.cache.Instru import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.AbstractBounds; @@ -156,15 +157,18 @@ public class SSTableReader extends SSTab EstimatedHistogram rowSizes; EstimatedHistogram columnCounts; File statsFile = new File(descriptor.filenameFor(SSTable.COMPONENT_STATS)); + ReplayPosition rp = ReplayPosition.NONE; if (statsFile.exists()) { DataInputStream dis = null; try { - logger.debug("Load statistics for {}", descriptor); + logger.debug("Load metadata for {}", descriptor); dis = new DataInputStream(new BufferedInputStream(new FileInputStream(statsFile))); rowSizes = EstimatedHistogram.serializer.deserialize(dis); columnCounts = EstimatedHistogram.serializer.deserialize(dis); + if (descriptor.hasReplayPosition()) + rp = ReplayPosition.serializer.deserialize(dis); } finally { @@ -178,7 +182,7 @@ public class SSTableReader extends SSTab columnCounts = SSTable.defaultColumnHistogram(); } - SSTableReader sstable = new SSTableReader(descriptor, components, metadata, partitioner, null, null, null, null, System.currentTimeMillis(), rowSizes, columnCounts); + SSTableReader sstable = new SSTableReader(descriptor, components, metadata, rp, partitioner, null, null, null, null, System.currentTimeMillis(), rowSizes, columnCounts); sstable.setTrackedBy(tracker); // versions before 'c' encoded keys as utf-16 before hashing to the filter @@ -203,16 +207,17 @@ public class SSTableReader extends SSTab /** * Open a RowIndexedReader which already has its state initialized (by SSTableWriter). */ - static SSTableReader internalOpen(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, Filter bf, long maxDataAge, EstimatedHistogram rowsize, + static SSTableReader internalOpen(Descriptor desc, Set<Component> components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, Filter bf, long maxDataAge, EstimatedHistogram rowsize, EstimatedHistogram columncount) throws IOException { assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null; - return new SSTableReader(desc, components, metadata, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount); + return new SSTableReader(desc, components, metadata, replayPosition, partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount); } private SSTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, + ReplayPosition replayPosition, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, @@ -223,7 +228,7 @@ public class SSTableReader extends SSTab EstimatedHistogram columnCounts) throws IOException { - super(desc, components, metadata, partitioner, rowSizes, columnCounts); + super(desc, components, metadata, replayPosition, partitioner, rowSizes, columnCounts); this.maxDataAge = maxDataAge; this.ifile = ifile; Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Mon May 9 07:05:55 2011 @@ -28,6 +28,7 @@ import java.util.Set; import com.google.common.collect.Sets; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.io.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; @@ -62,14 +63,15 @@ public class SSTableWriter extends SSTab public SSTableWriter(String filename, long keyCount) throws IOException { - this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner()); + this(filename, keyCount, DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), StorageService.getPartitioner(), ReplayPosition.NONE); } - public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner) throws IOException + public SSTableWriter(String filename, long keyCount, CFMetaData metadata, IPartitioner partitioner, ReplayPosition replayPosition) throws IOException { super(Descriptor.fromFilename(filename), new HashSet<Component>(Arrays.asList(Component.DATA, Component.FILTER, Component.PRIMARY_INDEX, Component.STATS)), metadata, + replayPosition, partitioner, SSTable.defaultRowHistogram(), SSTable.defaultColumnHistogram()); @@ -182,7 +184,7 @@ public class SSTableWriter extends SSTab FileUtils.truncate(dataFile.getPath(), position); // write sstable statistics - writeStatistics(descriptor, estimatedRowSize, estimatedColumnCount); + writeMetadata(descriptor, estimatedRowSize, estimatedColumnCount, replayPosition); // remove the 'tmp' marker from all components final Descriptor newdesc = rename(descriptor, components); @@ -190,20 +192,21 @@ public class SSTableWriter extends SSTab // finalize in-memory state for the reader SegmentedFile ifile = iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX)); SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA)); - SSTableReader sstable = SSTableReader.internalOpen(newdesc, components, metadata, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount); + SSTableReader sstable = SSTableReader.internalOpen(newdesc, components, metadata, replayPosition, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, estimatedColumnCount); iwriter = null; dbuilder = null; return sstable; } - private static void writeStatistics(Descriptor desc, EstimatedHistogram rowSizes, EstimatedHistogram columnnCounts) throws IOException + private static void writeMetadata(Descriptor desc, EstimatedHistogram rowSizes, EstimatedHistogram columnCounts, ReplayPosition rp) throws IOException { BufferedRandomAccessFile out = new BufferedRandomAccessFile(new File(desc.filenameFor(SSTable.COMPONENT_STATS)), "rw", BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE, true); EstimatedHistogram.serializer.serialize(rowSizes, out); - EstimatedHistogram.serializer.serialize(columnnCounts, out); + EstimatedHistogram.serializer.serialize(columnCounts, out); + ReplayPosition.serializer.serialize(rp, out); out.close(); } @@ -457,7 +460,7 @@ public class SSTableWriter extends SSTab rows++; } - writeStatistics(desc, rowSizes, columnCounts); + writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE); return rows; } } @@ -527,7 +530,7 @@ public class SSTableWriter extends SSTab rows++; } - writeStatistics(desc, rowSizes, columnCounts); + writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE); if (writerDfile.getFilePointer() != dfile.getFilePointer()) { Modified: cassandra/trunk/test/system/test_cql.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/test/system/test_cql.py (original) +++ cassandra/trunk/test/system/test_cql.py Mon May 9 07:05:55 2011 @@ -152,29 +152,68 @@ class TestCql(ThriftTester): def test_select_row_range(self): "retrieve a range of rows with columns" cursor = init() - cursor.execute(""" - SELECT 4 FROM StandardLongA WHERE KEY > 'ad' AND KEY < 'ag'; - """) - rows = [row[0] for row in cursor.fetchall()] - assert ['ad', 'ae', 'af', 'ag'] == rows, rows - def test_select_row_range_with_limit(self): - "retrieve a limited range of rows with columns" - cursor = init() - cursor.execute(""" - SELECT 1,5,9 FROM StandardLongA WHERE KEY > 'aa' - AND KEY < 'ag' LIMIT 3 - """) - assert cursor.rowcount == 3 - - cursor.execute(""" - SELECT 20,40 FROM StandardIntegerA WHERE KEY > 'k1' - AND KEY < 'k7' LIMIT 5 - """) - assert cursor.rowcount == 5 - for i in range(5): - r = cursor.fetchone() - assert r[0] == "k%d" % (i+1) + # everything + cursor.execute("SELECT * FROM StandardLongA") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa', 'ab', 'ac', 'ad', 'ae', 'af', 'ag'] == keys, keys + + # [start, end], mid-row + cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'ad' AND KEY <= 'ag'") + keys = [row[0] for row in cursor.fetchall()] + assert ['ad', 'ae', 'af', 'ag'] == keys, keys + + # (start, end), mid-row + cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'ad' AND KEY < 'ag'") + keys = [row[0] for row in cursor.fetchall()] + assert ['ae', 'af'] == keys, keys + + # [start, end], full-row + cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'aa' AND KEY <= 'ag'") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa', 'ab', 'ac', 'ad', 'ae', 'af', 'ag'] == keys, keys + + # (start, end), full-row + cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'a' AND KEY < 'g'") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa', 'ab', 'ac', 'ad', 'ae', 'af', 'ag'] == keys, keys + + # LIMIT tests + + # no WHERE + cursor.execute("SELECT * FROM StandardLongA LIMIT 1") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa'] == keys, keys + + # with >=, non-existing key + cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'a' LIMIT 1") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa'] == keys, keys + + # with >=, existing key + cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'aa' LIMIT 1") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa'] == keys, keys + + # with >, non-existing key + cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'a' LIMIT 1") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa'] == keys, keys + + # with >, existing key + cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'aa' LIMIT 1") + keys = [row[0] for row in cursor.fetchall()] + assert ['ab'] == keys, keys + + # with both > and <, existing keys + cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'aa' and KEY < 'ag' LIMIT 5") + keys = [row[0] for row in cursor.fetchall()] + assert ['ab', 'ac', 'ad', 'ae', 'af'] == keys, keys + + # with both > and <, non-existing keys + cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'a' and KEY < 'b' LIMIT 5") + keys = [row[0] for row in cursor.fetchall()] + assert ['aa', 'ab', 'ac', 'ad', 'ae'] == keys, keys def test_select_columns_slice(self): "column slice tests" @@ -287,7 +326,7 @@ class TestCql(ThriftTester): cursor = init() cursor.execute(""" SELECT 'birthdate' FROM IndexedA WHERE 'birthdate' = 100 - AND KEY > 'asmithZ' + AND KEY >= 'asmithZ' """) assert cursor.rowcount == 1 r = cursor.fetchone() Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Mon May 9 07:05:55 2011 @@ -27,36 +27,15 @@ import org.junit.Test; import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.CommitLogHeader; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.utils.Pair; public class CommitLogTest extends CleanupHelper { @Test - public void testRecoveryWithEmptyHeader() throws Exception - { - testRecovery(new byte[0], new byte[10]); - } - - @Test - public void testRecoveryWithShortHeader() throws Exception - { - testRecovery(new byte[2], new byte[10]); - } - - @Test - public void testRecoveryWithGarbageHeader() throws Exception - { - byte[] garbage = new byte[100]; - (new java.util.Random()).nextBytes(garbage); - testRecovery(garbage, garbage); - } - - @Test public void testRecoveryWithEmptyLog() throws Exception { - CommitLog.recover(new File[] {tmpFiles().right}); + CommitLog.recover(new File[] {tmpFile()}); } @Test @@ -69,13 +48,13 @@ public class CommitLogTest extends Clean @Test public void testRecoveryWithShortSize() throws Exception { - testRecovery(new byte[0], new byte[2]); + testRecovery(new byte[2]); } @Test public void testRecoveryWithShortCheckSum() throws Exception { - testRecovery(new byte[0], new byte[6]); + testRecovery(new byte[6]); } @Test @@ -83,7 +62,7 @@ public class CommitLogTest extends Clean { byte[] garbage = new byte[100]; (new java.util.Random()).nextBytes(garbage); - testRecovery(new byte[0], garbage); + testRecovery(garbage); } @Test @@ -108,30 +87,6 @@ public class CommitLogTest extends Clean testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF } - @Test - public void testRecoveryWithHeaderPositionGreaterThanLogLength() throws Exception - { - // Note: this can actually happen (in periodic mode) when data is flushed - // before it had time to hit the commitlog (since the header is flushed by the system) - // see https://issues.apache.org/jira/browse/CASSANDRA-2285 - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(out); - Checksum checksum = new CRC32(); - - // write the first checksum after the fixed-size part, so we won't read garbage lastFlushedAt data. - dos.writeInt(1); - checksum.update(1); - dos.writeLong(checksum.getValue()); - dos.writeInt(0); - checksum.update(0); - dos.writeInt(200); - checksum.update(200); - dos.writeLong(checksum.getValue()); - dos.close(); - - testRecovery(out.toByteArray(), new byte[0]); - } - protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception { Checksum checksum = new CRC32(); @@ -147,29 +102,22 @@ public class CommitLogTest extends Clean dout.writeLong(checksum); dout.write(new byte[dataSize]); dout.close(); - testRecovery(new byte[0], out.toByteArray()); + testRecovery(out.toByteArray()); } - protected Pair<File, File> tmpFiles() throws IOException + protected File tmpFile() throws IOException { File logFile = File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null); - File headerFile = new File(CommitLogHeader.getHeaderPathFromSegmentPath(logFile.getAbsolutePath())); logFile.deleteOnExit(); - headerFile.deleteOnExit(); assert logFile.length() == 0; - assert headerFile.length() == 0; - return new Pair<File, File>(headerFile, logFile); + return logFile; } - protected void testRecovery(byte[] headerData, byte[] logData) throws Exception + protected void testRecovery(byte[] logData) throws Exception { - Pair<File, File> tmpFiles = tmpFiles(); - File logFile = tmpFiles.right; - File headerFile = tmpFiles.left; + File logFile = tmpFile(); OutputStream lout = new FileOutputStream(logFile); - OutputStream hout = new FileOutputStream(headerFile); lout.write(logData); - hout.write(headerData); //statics make it annoying to test things correctly CommitLog.recover(new File[] {logFile}); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ } Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java Mon May 9 07:05:55 2011 @@ -223,7 +223,7 @@ public class CompactionsPurgeTest extend // Check that the second insert did went in ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName))); - assert cf.getColumnCount() == 10; + assertEquals(10, cf.getColumnCount()); for (IColumn c : cf) assert !c.isMarkedForDelete(); } Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=1100900&r1=1100899&r2=1100900&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Mon May 9 07:05:55 2011 @@ -59,17 +59,12 @@ public class RecoveryManager2Test extend logger.debug("forcing flush"); cfs.forceBlockingFlush(); - // remove Standard1 SSTable/MemTables - cfs.clearUnsafe(); - logger.debug("begin manual replay"); - // replay the commit log (nothing should be replayed since everything was flushed) + // replay the commit log (nothing on Standard1 should be replayed since everything was flushed, so only the row on Standard2 + // will be replayed) CommitLog.instance.resetUnsafe(); - CommitLog.recover(); - - // since everything that was flushed was removed (i.e. clearUnsafe) - // and the commit shouldn't have replayed anything, there should be no data - assert Util.getRangeSlice(cfs).isEmpty(); + int replayed = CommitLog.recover(); + assert replayed == 1 : "Expecting only 1 replayed mutation, got " + replayed; } private void insertRow(String cfname, String key) throws IOException
