Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogHeader.java 
Mon May  9 07:05:55 2011
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.commitlog;
-
-import java.io.*;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
-
-import org.apache.cassandra.io.ICompactSerializer2;
-import org.apache.cassandra.io.util.FileUtils;
-
-public class CommitLogHeader
-{
-    public static String getHeaderPathFromSegment(CommitLogSegment segment)
-    {
-        return getHeaderPathFromSegmentPath(segment.getPath());
-    }
-
-    public static String getHeaderPathFromSegmentPath(String segmentPath)
-    {
-        return segmentPath + ".header";
-    }
-
-    public static CommitLogHeaderSerializer serializer = new 
CommitLogHeaderSerializer();
-
-    private Map<Integer, Integer> cfDirtiedAt; // position at which each CF 
was last flushed
-
-    CommitLogHeader()
-    {
-        this(new HashMap<Integer, Integer>());
-    }
-    
-    /*
-     * This ctor is used while deserializing. This ctor
-     * also builds an index of position to column family
-     * Id.
-    */
-    private CommitLogHeader(Map<Integer, Integer> cfDirtiedAt)
-    {
-        this.cfDirtiedAt = cfDirtiedAt;
-    }
-        
-    boolean isDirty(Integer cfId)
-    {
-        return cfDirtiedAt.containsKey(cfId);
-    } 
-    
-    int getPosition(Integer cfId)
-    {
-        Integer x = cfDirtiedAt.get(cfId);
-        return x == null ? 0 : x;
-    }
-    
-    void turnOn(Integer cfId, long position)
-    {
-        assert position >= 0 && position <= Integer.MAX_VALUE;
-        cfDirtiedAt.put(cfId, (int)position);
-    }
-
-    void turnOff(Integer cfId)
-    {
-        cfDirtiedAt.remove(cfId);
-    }
-
-    boolean isSafeToDelete() throws IOException
-    {
-        return cfDirtiedAt.isEmpty();
-    }
-    
-    // we use cf ids. getting the cf names would be pretty pretty expensive.
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder("");
-        sb.append("CLH(dirty+flushed={");
-        for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet())
-        {       
-            sb.append(entry.getKey()).append(": 
").append(entry.getValue()).append(", ");
-        }
-        sb.append("})");
-        return sb.toString();
-    }
-
-    public String dirtyString()
-    {
-        StringBuilder sb = new StringBuilder();
-        for (Map.Entry<Integer, Integer> entry : cfDirtiedAt.entrySet())
-            sb.append(entry.getKey()).append(", ");
-        return sb.toString();
-    }
-
-    static void writeCommitLogHeader(CommitLogHeader header, String 
headerFile) throws IOException
-    {
-        DataOutputStream out = null;
-        try
-        {
-            /*
-             * FileOutputStream doesn't sync on flush/close.
-             * As headers are "optional" now there is no reason to sync it.
-             * This provides nearly double the performance of BRAF, more under 
heavey load.
-             */
-            out = new DataOutputStream(new FileOutputStream(headerFile));
-            serializer.serialize(header, out);
-        }
-        finally
-        {
-            if (out != null)
-                out.close();
-        }
-    }
-
-    static CommitLogHeader readCommitLogHeader(String headerFile) throws 
IOException
-    {
-        DataInputStream reader = null;
-        try
-        {
-            reader = new DataInputStream(new BufferedInputStream(new 
FileInputStream(headerFile)));
-            return serializer.deserialize(reader);
-        }
-        finally
-        {
-            FileUtils.closeQuietly(reader);
-        }
-    }
-
-    int getReplayPosition()
-    {
-        return cfDirtiedAt.isEmpty() ? -1 : 
Collections.min(cfDirtiedAt.values());
-    }
-
-    static class CommitLogHeaderSerializer implements 
ICompactSerializer2<CommitLogHeader>
-    {
-        public void serialize(CommitLogHeader clHeader, DataOutput dos) throws 
IOException
-        {
-            Checksum checksum = new CRC32();
-
-            // write the first checksum after the fixed-size part, so we won't 
read garbage lastFlushedAt data.
-            dos.writeInt(clHeader.cfDirtiedAt.size()); // 4
-            checksum.update(clHeader.cfDirtiedAt.size());
-            dos.writeLong(checksum.getValue());
-
-            // write the 2nd checksum after the lastflushedat map
-            for (Map.Entry<Integer, Integer> entry : 
clHeader.cfDirtiedAt.entrySet())
-            {
-                dos.writeInt(entry.getKey()); // 4
-                checksum.update(entry.getKey());
-                dos.writeInt(entry.getValue()); // 4
-                checksum.update(entry.getValue());
-            }
-            dos.writeLong(checksum.getValue());
-        }
-
-        public CommitLogHeader deserialize(DataInput dis) throws IOException
-        {
-            Checksum checksum = new CRC32();
-
-            int lastFlushedAtSize = dis.readInt();
-            checksum.update(lastFlushedAtSize);
-            if (checksum.getValue() != dis.readLong())
-            {
-                throw new IOException("Invalid or corrupt commitlog header");
-            }
-            Map<Integer, Integer> lastFlushedAt = new HashMap<Integer, 
Integer>();
-            for (int i = 0; i < lastFlushedAtSize; i++)
-            {
-                int key = dis.readInt();
-                checksum.update(key);
-                int value = dis.readInt();
-                checksum.update(value);
-                lastFlushedAt.put(key, value);
-            }
-            if (checksum.getValue() != dis.readLong())
-            {
-                throw new IOException("Invalid or corrupt commitlog header");
-            }
-
-            return new CommitLogHeader(lastFlushedAt);
-        }
-    }
-}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
 Mon May  9 07:05:55 2011
@@ -23,6 +23,10 @@ package org.apache.cassandra.db.commitlo
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
@@ -30,30 +34,30 @@ import org.apache.cassandra.net.Messagin
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 
 public class CommitLogSegment
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CommitLogSegment.class);
+    private static Pattern COMMIT_LOG_FILE_PATTERN = 
Pattern.compile("CommitLog-(\\d+).log");
 
+    public final long id;
     private final BufferedRandomAccessFile logWriter;
-    private final CommitLogHeader header;
+
+    // cache which cf is dirty in this segment to avoid having to lookup all 
ReplayPositions to decide if we could delete this segment
+    private Set<Integer> cfDirty = new HashSet<Integer>();
 
     public CommitLogSegment()
     {
-        this.header = new CommitLogHeader();
-        String logFile = DatabaseDescriptor.getCommitLogLocation() + 
File.separator + "CommitLog-" + System.currentTimeMillis() + ".log";
+        id = System.currentTimeMillis();
+        String logFile = DatabaseDescriptor.getCommitLogLocation() + 
File.separator + "CommitLog-" + id + ".log";
         logger.info("Creating new commitlog segment " + logFile);
 
         try
         {
             logWriter = createWriter(logFile);
-
-            writeHeader();
         }
         catch (IOException e)
         {
@@ -61,14 +65,26 @@ public class CommitLogSegment
         }
     }
 
-    public static boolean possibleCommitLogFile(String filename)
+    // assume filename is a 'possibleCommitLogFile()'
+    public static long idFromFilename(String filename)
     {
-        return filename.matches("CommitLog-\\d+.log");
+        Matcher matcher = COMMIT_LOG_FILE_PATTERN.matcher(filename);
+        try
+        {
+            if (matcher.matches())
+                return Long.valueOf(matcher.group(1));
+            else
+                return -1L;
+        }
+        catch (NumberFormatException e)
+        {
+            return -1L;
+        }
     }
 
-    public void writeHeader() throws IOException
+    public static boolean possibleCommitLogFile(String filename)
     {
-        CommitLogHeader.writeCommitLogHeader(header, getHeaderPath());
+        return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches();
     }
 
     private static BufferedRandomAccessFile createWriter(String file) throws 
IOException
@@ -76,35 +92,14 @@ public class CommitLogSegment
         return new BufferedRandomAccessFile(new File(file), "rw", 128 * 1024, 
true);
     }
 
-    public CommitLogSegment.CommitLogContext write(RowMutation rowMutation) 
throws IOException
+    public ReplayPosition write(RowMutation rowMutation) throws IOException
     {
         long currentPosition = -1L;
         try
         {
             currentPosition = logWriter.getFilePointer();
-            CommitLogSegment.CommitLogContext cLogCtx = new 
CommitLogSegment.CommitLogContext(currentPosition);
-
-            // update header
-            for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
-            {
-                // we can ignore the serialized map in the header (and avoid 
deserializing it) since we know we are
-                // writing the cfs as they exist now.  check for null cfm in 
case a cl write goes through after the cf is 
-                // defined but before a new segment is created.
-                CFMetaData cfm = 
DatabaseDescriptor.getCFMetaData(columnFamily.id());
-                if (cfm == null)
-                {
-                    logger.error("Attempted to write commit log entry for 
unrecognized column family: " + columnFamily.id());
-                }
-                else
-                {
-                    Integer id = cfm.cfId;
-                    if (!header.isDirty(id))
-                    {
-                        header.turnOn(id, logWriter.getFilePointer());
-                        writeHeader();
-                    }
-                }
-            }
+            assert currentPosition <= Integer.MAX_VALUE;
+            ReplayPosition cLogCtx = new ReplayPosition(id, (int) 
currentPosition);
 
             // write mutation, w/ checksum on the size and data
             Checksum checksum = new CRC32();
@@ -131,14 +126,11 @@ public class CommitLogSegment
         logWriter.sync();
     }
 
-    public CommitLogContext getContext()
-    {
-        return new CommitLogContext(logWriter.getFilePointer());
-    }
-
-    public CommitLogHeader getHeader()
+    public ReplayPosition getContext()
     {
-        return header;
+        long position = logWriter.getFilePointer();
+        assert position <= Integer.MAX_VALUE;
+        return new ReplayPosition(id, (int) position);
     }
 
     public String getPath()
@@ -146,9 +138,9 @@ public class CommitLogSegment
         return logWriter.getPath();
     }
 
-    public String getHeaderPath()
+    public String getName()
     {
-        return CommitLogHeader.getHeaderPathFromSegment(this);
+        return 
logWriter.getPath().substring(logWriter.getPath().lastIndexOf(File.separator) + 
1);
     }
 
     public long length()
@@ -175,34 +167,34 @@ public class CommitLogSegment
         }
     }
 
-    @Override
-    public String toString()
+    void turnOn(Integer cfId)
     {
-        return "CommitLogSegment(" + logWriter.getPath() + ')';
+        cfDirty.add(cfId);
     }
 
-    public class CommitLogContext
+    void turnOff(Integer cfId)
     {
-        public final long position;
+        cfDirty.remove(cfId);
+    }
 
-        public CommitLogContext(long position)
-        {
-            assert position >= 0;
-            this.position = position;
-        }
+    // For debugging, not fast
+    String dirtyString()
+    {
+        StringBuilder sb = new StringBuilder();
+        for (Integer cfId : cfDirty)
+            sb.append(DatabaseDescriptor.getCFMetaData(cfId).cfName).append(" 
(").append(cfId).append("), ");
+        return sb.toString();
+    }
 
-        public CommitLogSegment getSegment()
-        {
-            return CommitLogSegment.this;
-        }
+    boolean isSafeToDelete()
+    {
+        return cfDirty.isEmpty();
+    }
 
-        @Override
-        public String toString()
-        {
-            return "CommitLogContext(" +
-                   "file='" + logWriter.getPath() + '\'' +
-                   ", position=" + position +
-                   ')';
-        }
+    @Override
+    public String toString()
+    {
+        return "CommitLogSegment(" + logWriter.getPath() + ')';
     }
+
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
 Mon May  9 07:05:55 2011
@@ -21,6 +21,7 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
+import java.sql.Types;
 
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.context.CounterContext;
@@ -52,4 +53,39 @@ public abstract class AbstractCommutativ
     {
         return Long.class;
     }
+
+    public boolean isSigned()
+    {
+        return true;
+    }
+
+    public boolean isCaseSensitive()
+    {
+        return false;
+    }
+
+    public boolean isCurrency()
+    {
+        return false;
+    }
+
+    public int getPrecision(Long obj)
+    {
+        return obj.toString().length();
+    }
+
+    public int getScale(Long obj)
+    {
+        return 0;
+    }
+
+    public int getJdbcType()
+    {
+        return Types.INTEGER;
+    }
+
+    public boolean needsQuotes()
+    {
+        return false;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java 
Mon May  9 07:05:55 2011
@@ -147,4 +147,16 @@ public abstract class AbstractType<T> im
     
     /** returns the class this AbstractType represents. */
     public abstract Class<T> getType();
+
+    //
+    // JDBC metadata
+    //
+
+    public abstract boolean isSigned();
+    public abstract boolean isCaseSensitive();
+    public abstract boolean isCurrency();
+    public abstract int getPrecision(T obj);
+    public abstract int getScale(T obj);
+    public abstract int getJdbcType();
+    public abstract boolean needsQuotes();
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java Mon 
May  9 07:05:55 2011
@@ -23,6 +23,7 @@ package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
+import java.sql.Types;
 
 import com.google.common.base.Charsets;
 
@@ -86,4 +87,39 @@ public class AsciiType extends AbstractT
     {
         return String.class;
     }
+
+    public boolean isSigned()
+    {
+        return false;
+    }
+
+    public boolean isCaseSensitive()
+    {
+        return true;
+    }
+
+    public boolean isCurrency()
+    {
+        return false;
+    }
+
+    public int getPrecision(String obj)
+    {
+        return -1;
+    }
+
+    public int getScale(String obj)
+    {
+        return -1;
+    }
+
+    public int getJdbcType()
+    {
+        return Types.VARCHAR;
+    }
+
+    public boolean needsQuotes()
+    {
+        return true;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/BytesType.java Mon 
May  9 07:05:55 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal;
 
 
 import java.nio.ByteBuffer;
+import java.sql.Types;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -88,4 +89,39 @@ public class BytesType extends AbstractT
     {
         return ByteBuffer.class;
     }
+
+    public boolean isSigned()
+    {
+        return false;
+    }
+
+    public boolean isCaseSensitive()
+    {
+        return false;
+    }
+
+    public boolean isCurrency()
+    {
+        return false;
+    }
+
+    public int getPrecision(ByteBuffer obj)
+    {
+        return -1;
+    }
+
+    public int getScale(ByteBuffer obj)
+    {
+        return -1;
+    }
+
+    public int getJdbcType()
+    {
+        return Types.BINARY;
+    }
+
+    public boolean needsQuotes()
+    {
+        return true;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/IntegerType.java 
Mon May  9 07:05:55 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.marshal;
 
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.sql.Types;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.thrift.TBaseHelper;
@@ -169,4 +170,39 @@ public final class IntegerType extends A
     {
         return BigInteger.class;
     }
+
+    public boolean isSigned()
+    {
+        return true;
+    }
+
+    public boolean isCaseSensitive()
+    {
+        return false;
+    }
+
+    public boolean isCurrency()
+    {
+        return false;
+    }
+
+    public int getPrecision(BigInteger obj)
+    {
+        return obj.toString().length();
+    }
+
+    public int getScale(BigInteger obj)
+    {
+        return 0;
+    }
+
+    public int getJdbcType()
+    {
+        return Types.BIGINT;
+    }
+
+    public boolean needsQuotes()
+    {
+        return false;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java 
Mon May  9 07:05:55 2011
@@ -27,7 +27,7 @@ import java.util.UUID;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
 
-public class LexicalUUIDType extends AbstractType<UUID>
+public class LexicalUUIDType extends AbstractUUIDType
 {
     public static final LexicalUUIDType instance = new LexicalUUIDType();
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
 Mon May  9 07:05:55 2011
@@ -77,4 +77,39 @@ public class LocalByPartionerType<T exte
     {
         return ByteBuffer.class;
     }
+
+    public boolean isSigned()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isCaseSensitive()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isCurrency()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int getPrecision(ByteBuffer obj)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int getScale(ByteBuffer obj)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public int getJdbcType()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean needsQuotes()
+    {
+        throw new UnsupportedOperationException();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/LongType.java Mon 
May  9 07:05:55 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal;
 
 
 import java.nio.ByteBuffer;
+import java.sql.Types;
 
 import org.apache.cassandra.utils.ByteBufferUtil;
 
@@ -109,4 +110,39 @@ public class LongType extends AbstractTy
     {
         return Long.class;
     }
+
+    public boolean isSigned()
+    {
+        return true;
+    }
+
+    public boolean isCaseSensitive()
+    {
+        return false;
+    }
+
+    public boolean isCurrency()
+    {
+        return false;
+    }
+
+    public int getPrecision(Long obj)
+    {
+        return obj.toString().length();
+    }
+
+    public int getScale(Long obj)
+    {
+        return 0;
+    }
+
+    public int getJdbcType()
+    {
+        return Types.INTEGER;
+    }
+
+    public boolean needsQuotes()
+    {
+        return false;
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java 
Mon May  9 07:05:55 2011
@@ -31,7 +31,7 @@ import org.apache.cassandra.utils.FBUtil
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.lang.time.DateUtils;
 
-public class TimeUUIDType extends AbstractType<UUID>
+public class TimeUUIDType extends AbstractUUIDType
 {
     public static final TimeUUIDType instance = new TimeUUIDType();
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UTF8Type.java Mon 
May  9 07:05:55 2011
@@ -22,6 +22,7 @@ package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
+import java.sql.Types;
 
 import com.google.common.base.Charsets;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -196,4 +197,39 @@ public class UTF8Type extends AbstractTy
     {
         return String.class;
     }
+
+    public boolean isSigned()
+    {
+        return false;
+    }
+
+    public boolean isCaseSensitive()
+    {
+        return true;
+    }
+
+    public boolean isCurrency()
+    {
+        return false;
+    }
+
+    public int getPrecision(String obj)
+    {
+        return -1;
+    }
+
+    public int getScale(String obj)
+    {
+        return -1;
+    }
+
+    public int getJdbcType()
+    {
+        return Types.VARCHAR;
+    }
+
+    public boolean needsQuotes()
+    {
+        return true;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/UUIDType.java Mon 
May  9 07:05:55 2011
@@ -43,7 +43,7 @@ import org.apache.cassandra.utils.UUIDGe
  * @see "com.fasterxml.uuid.UUIDComparator"
  * 
  */
-public class UUIDType extends AbstractType<UUID>
+public class UUIDType extends AbstractUUIDType
 {
     public static final UUIDType instance = new UUIDType();
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
Mon May  9 07:05:55 2011
@@ -38,7 +38,7 @@ import org.apache.cassandra.utils.Pair;
 public class Descriptor
 {
     public static final String LEGACY_VERSION = "a";
-    public static final String CURRENT_VERSION = "f";
+    public static final String CURRENT_VERSION = "g";
 
     public final File directory;
     public final String version;
@@ -80,6 +80,11 @@ public class Descriptor
         isLatestVersion = version.compareTo(CURRENT_VERSION) == 0;
     }
 
+    public boolean hasReplayPosition()
+    {
+        return version.compareTo("g") >= 0;
+    }
+
     public String filenameFor(Component component)
     {
         return filenameFor(component.name());

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Mon 
May  9 07:05:55 2011
@@ -31,6 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileUtils;
@@ -67,42 +68,53 @@ public abstract class SSTable
     public final CFMetaData metadata;
     public final IPartitioner partitioner;
 
+    public final ReplayPosition replayPosition;
+
     protected final EstimatedHistogram estimatedRowSize;
     protected final EstimatedHistogram estimatedColumnCount;
 
-    protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner 
partitioner)
+    protected SSTable(Descriptor descriptor, CFMetaData metadata, 
ReplayPosition replayPosition, IPartitioner partitioner)
     {
-        this(descriptor, new HashSet<Component>(), metadata, partitioner);
+        this(descriptor, new HashSet<Component>(), metadata, replayPosition, 
partitioner);
     }
 
-    protected SSTable(Descriptor descriptor, Set<Component> components, 
CFMetaData metadata, IPartitioner partitioner)
-    {
-        this(descriptor, components, metadata, partitioner, 
defaultRowHistogram(), defaultColumnHistogram());
-    }
-
-    static EstimatedHistogram defaultColumnHistogram()
+    protected SSTable(Descriptor descriptor, Set<Component> components, 
CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner)
     {
-        return new EstimatedHistogram(114);
+        this(descriptor, components, metadata, replayPosition, partitioner, 
defaultRowHistogram(), defaultColumnHistogram());
     }
 
-    static EstimatedHistogram defaultRowHistogram()
+    protected SSTable(Descriptor descriptor, Set<Component> components, 
CFMetaData metadata, ReplayPosition replayPosition, IPartitioner partitioner, 
EstimatedHistogram rowSizes, EstimatedHistogram columnCounts)
     {
-        return new EstimatedHistogram(150);
-    }
+        assert descriptor != null;
+        assert components != null;
+        assert metadata != null;
+        assert replayPosition != null;
+        assert partitioner != null;
+        assert rowSizes != null;
+        assert columnCounts != null;
 
-    protected SSTable(Descriptor descriptor, Set<Component> components, 
CFMetaData metadata, IPartitioner partitioner, EstimatedHistogram rowSizes, 
EstimatedHistogram columnCounts)
-    {
         this.descriptor = descriptor;
         Set<Component> dataComponents = new HashSet<Component>(components);
         for (Component component : components)
             assert component.type != Component.Type.COMPACTED_MARKER;
         this.components = Collections.unmodifiableSet(dataComponents);
         this.metadata = metadata;
+        this.replayPosition = replayPosition;
         this.partitioner = partitioner;
         estimatedRowSize = rowSizes;
         estimatedColumnCount = columnCounts;
     }
 
+    static EstimatedHistogram defaultColumnHistogram()
+    {
+        return new EstimatedHistogram(114);
+    }
+
+    static EstimatedHistogram defaultRowHistogram()
+    {
+        return new EstimatedHistogram(150);
+    }
+
     public EstimatedHistogram getEstimatedRowSize()
     {
         return estimatedRowSize;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Mon May  9 07:05:55 2011
@@ -34,6 +34,7 @@ import org.apache.cassandra.cache.Instru
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -156,15 +157,18 @@ public class SSTableReader extends SSTab
         EstimatedHistogram rowSizes;
         EstimatedHistogram columnCounts;
         File statsFile = new 
File(descriptor.filenameFor(SSTable.COMPONENT_STATS));
+        ReplayPosition rp = ReplayPosition.NONE;
         if (statsFile.exists())
         {
             DataInputStream dis = null;
             try
             {
-                logger.debug("Load statistics for {}", descriptor);
+                logger.debug("Load metadata for {}", descriptor);
                 dis = new DataInputStream(new BufferedInputStream(new 
FileInputStream(statsFile)));
                 rowSizes = EstimatedHistogram.serializer.deserialize(dis);
                 columnCounts = EstimatedHistogram.serializer.deserialize(dis);
+                if (descriptor.hasReplayPosition())
+                    rp = ReplayPosition.serializer.deserialize(dis);
             }
             finally
             {
@@ -178,7 +182,7 @@ public class SSTableReader extends SSTab
             columnCounts = SSTable.defaultColumnHistogram();
         }
 
-        SSTableReader sstable = new SSTableReader(descriptor, components, 
metadata, partitioner, null, null, null, null, System.currentTimeMillis(), 
rowSizes, columnCounts);
+        SSTableReader sstable = new SSTableReader(descriptor, components, 
metadata, rp, partitioner, null, null, null, null, System.currentTimeMillis(), 
rowSizes, columnCounts);
         sstable.setTrackedBy(tracker);
 
         // versions before 'c' encoded keys as utf-16 before hashing to the 
filter
@@ -203,16 +207,17 @@ public class SSTableReader extends SSTab
     /**
      * Open a RowIndexedReader which already has its state initialized (by 
SSTableWriter).
      */
-    static SSTableReader internalOpen(Descriptor desc, Set<Component> 
components, CFMetaData metadata, IPartitioner partitioner, SegmentedFile ifile, 
SegmentedFile dfile, IndexSummary isummary, Filter bf, long maxDataAge, 
EstimatedHistogram rowsize,
+    static SSTableReader internalOpen(Descriptor desc, Set<Component> 
components, CFMetaData metadata, ReplayPosition replayPosition, IPartitioner 
partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, 
Filter bf, long maxDataAge, EstimatedHistogram rowsize,
                                       EstimatedHistogram columncount) throws 
IOException
     {
         assert desc != null && partitioner != null && ifile != null && dfile 
!= null && isummary != null && bf != null;
-        return new SSTableReader(desc, components, metadata, partitioner, 
ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
+        return new SSTableReader(desc, components, metadata, replayPosition, 
partitioner, ifile, dfile, isummary, bf, maxDataAge, rowsize, columncount);
     }
 
     private SSTableReader(Descriptor desc,
                           Set<Component> components,
                           CFMetaData metadata,
+                          ReplayPosition replayPosition,
                           IPartitioner partitioner,
                           SegmentedFile ifile,
                           SegmentedFile dfile,
@@ -223,7 +228,7 @@ public class SSTableReader extends SSTab
                           EstimatedHistogram columnCounts)
     throws IOException
     {
-        super(desc, components, metadata, partitioner, rowSizes, columnCounts);
+        super(desc, components, metadata, replayPosition, partitioner, 
rowSizes, columnCounts);
         this.maxDataAge = maxDataAge;
 
         this.ifile = ifile;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Mon May  9 07:05:55 2011
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import com.google.common.collect.Sets;
 
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.io.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.slf4j.Logger;
@@ -62,14 +63,15 @@ public class SSTableWriter extends SSTab
 
     public SSTableWriter(String filename, long keyCount) throws IOException
     {
-        this(filename, keyCount, 
DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), 
StorageService.getPartitioner());
+        this(filename, keyCount, 
DatabaseDescriptor.getCFMetaData(Descriptor.fromFilename(filename)), 
StorageService.getPartitioner(), ReplayPosition.NONE);
     }
 
-    public SSTableWriter(String filename, long keyCount, CFMetaData metadata, 
IPartitioner partitioner) throws IOException
+    public SSTableWriter(String filename, long keyCount, CFMetaData metadata, 
IPartitioner partitioner, ReplayPosition replayPosition) throws IOException
     {
         super(Descriptor.fromFilename(filename),
               new HashSet<Component>(Arrays.asList(Component.DATA, 
Component.FILTER, Component.PRIMARY_INDEX, Component.STATS)),
               metadata,
+              replayPosition,
               partitioner,
               SSTable.defaultRowHistogram(),
               SSTable.defaultColumnHistogram());
@@ -182,7 +184,7 @@ public class SSTableWriter extends SSTab
         FileUtils.truncate(dataFile.getPath(), position);
 
         // write sstable statistics
-        writeStatistics(descriptor, estimatedRowSize, estimatedColumnCount);
+        writeMetadata(descriptor, estimatedRowSize, estimatedColumnCount, 
replayPosition);
 
         // remove the 'tmp' marker from all components
         final Descriptor newdesc = rename(descriptor, components);
@@ -190,20 +192,21 @@ public class SSTableWriter extends SSTab
         // finalize in-memory state for the reader
         SegmentedFile ifile = 
iwriter.builder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
         SegmentedFile dfile = 
dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
-        SSTableReader sstable = SSTableReader.internalOpen(newdesc, 
components, metadata, partitioner, ifile, dfile, iwriter.summary, iwriter.bf, 
maxDataAge, estimatedRowSize, estimatedColumnCount);
+        SSTableReader sstable = SSTableReader.internalOpen(newdesc, 
components, metadata, replayPosition, partitioner, ifile, dfile, 
iwriter.summary, iwriter.bf, maxDataAge, estimatedRowSize, 
estimatedColumnCount);
         iwriter = null;
         dbuilder = null;
         return sstable;
     }
 
-    private static void writeStatistics(Descriptor desc, EstimatedHistogram 
rowSizes, EstimatedHistogram columnnCounts) throws IOException
+    private static void writeMetadata(Descriptor desc, EstimatedHistogram 
rowSizes, EstimatedHistogram columnCounts, ReplayPosition rp) throws IOException
     {
         BufferedRandomAccessFile out = new BufferedRandomAccessFile(new 
File(desc.filenameFor(SSTable.COMPONENT_STATS)),
                                                                      "rw",
                                                                      
BufferedRandomAccessFile.DEFAULT_BUFFER_SIZE,
                                                                      true);
         EstimatedHistogram.serializer.serialize(rowSizes, out);
-        EstimatedHistogram.serializer.serialize(columnnCounts, out);
+        EstimatedHistogram.serializer.serialize(columnCounts, out);
+        ReplayPosition.serializer.serialize(rp, out);
         out.close();
     }
 
@@ -457,7 +460,7 @@ public class SSTableWriter extends SSTab
 
                 rows++;
             }
-            writeStatistics(desc, rowSizes, columnCounts);
+            writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
             return rows;
         }
     }
@@ -527,7 +530,7 @@ public class SSTableWriter extends SSTab
 
                 rows++;
             }
-            writeStatistics(desc, rowSizes, columnCounts);
+            writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
 
             if (writerDfile.getFilePointer() != dfile.getFilePointer())
             {

Modified: cassandra/trunk/test/system/test_cql.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_cql.py (original)
+++ cassandra/trunk/test/system/test_cql.py Mon May  9 07:05:55 2011
@@ -152,29 +152,68 @@ class TestCql(ThriftTester):
     def test_select_row_range(self):
         "retrieve a range of rows with columns"
         cursor = init()
-        cursor.execute("""
-            SELECT 4 FROM StandardLongA WHERE KEY > 'ad' AND KEY < 'ag';
-        """)
-        rows = [row[0] for row in cursor.fetchall()]
-        assert ['ad', 'ae', 'af', 'ag'] == rows, rows
 
-    def test_select_row_range_with_limit(self):
-        "retrieve a limited range of rows with columns"
-        cursor = init()
-        cursor.execute("""
-            SELECT 1,5,9 FROM StandardLongA WHERE KEY > 'aa'
-                    AND KEY < 'ag' LIMIT 3
-        """)
-        assert cursor.rowcount == 3
-
-        cursor.execute("""
-            SELECT 20,40 FROM StandardIntegerA WHERE KEY > 'k1'
-                    AND KEY < 'k7' LIMIT 5
-        """)
-        assert cursor.rowcount == 5
-        for i in range(5):
-            r = cursor.fetchone()
-            assert r[0] == "k%d" % (i+1)
+        # everything
+        cursor.execute("SELECT * FROM StandardLongA")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa', 'ab', 'ac', 'ad', 'ae', 'af', 'ag'] == keys, keys
+
+        # [start, end], mid-row
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'ad' AND KEY 
<= 'ag'")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['ad', 'ae', 'af', 'ag'] == keys, keys
+
+        # (start, end), mid-row
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'ad' AND KEY < 
'ag'")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['ae', 'af'] == keys, keys
+
+        # [start, end], full-row
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'aa' AND KEY 
<= 'ag'")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa', 'ab', 'ac', 'ad', 'ae', 'af', 'ag'] == keys, keys
+
+        # (start, end), full-row
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'a' AND KEY < 
'g'")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa', 'ab', 'ac', 'ad', 'ae', 'af', 'ag'] == keys, keys
+
+        # LIMIT tests
+
+        # no WHERE
+        cursor.execute("SELECT * FROM StandardLongA LIMIT 1")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa'] == keys, keys
+
+        # with >=, non-existing key
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'a' LIMIT 1")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa'] == keys, keys
+
+        # with >=, existing key
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY >= 'aa' LIMIT 1")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa'] == keys, keys
+
+        # with >, non-existing key
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'a' LIMIT 1")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa'] == keys, keys
+
+        # with >, existing key
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'aa' LIMIT 1")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['ab'] == keys, keys
+
+        # with both > and <, existing keys
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'aa' and KEY < 
'ag' LIMIT 5")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['ab', 'ac', 'ad', 'ae', 'af'] == keys, keys
+
+        # with both > and <, non-existing keys
+        cursor.execute("SELECT * FROM StandardLongA WHERE KEY > 'a' and KEY < 
'b' LIMIT 5")
+        keys = [row[0] for row in cursor.fetchall()]
+        assert ['aa', 'ab', 'ac', 'ad', 'ae'] == keys, keys
 
     def test_select_columns_slice(self):
         "column slice tests"
@@ -287,7 +326,7 @@ class TestCql(ThriftTester):
         cursor = init()
         cursor.execute("""
             SELECT 'birthdate' FROM IndexedA WHERE 'birthdate' = 100
-                    AND KEY > 'asmithZ'
+                    AND KEY >= 'asmithZ'
         """)
         assert cursor.rowcount == 1
         r = cursor.fetchone()

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CommitLogTest.java Mon 
May  9 07:05:55 2011
@@ -27,36 +27,15 @@ import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogHeader;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.Pair;
 
 public class CommitLogTest extends CleanupHelper
 {
     @Test
-    public void testRecoveryWithEmptyHeader() throws Exception
-    {
-        testRecovery(new byte[0], new byte[10]);
-    }
-
-    @Test
-    public void testRecoveryWithShortHeader() throws Exception
-    {
-        testRecovery(new byte[2], new byte[10]);
-    }
-
-    @Test
-    public void testRecoveryWithGarbageHeader() throws Exception
-    {
-        byte[] garbage = new byte[100];
-        (new java.util.Random()).nextBytes(garbage);
-        testRecovery(garbage, garbage);
-    }
-
-    @Test
     public void testRecoveryWithEmptyLog() throws Exception
     {
-        CommitLog.recover(new File[] {tmpFiles().right});
+        CommitLog.recover(new File[] {tmpFile()});
     }
 
     @Test
@@ -69,13 +48,13 @@ public class CommitLogTest extends Clean
     @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        testRecovery(new byte[0], new byte[2]);
+        testRecovery(new byte[2]);
     }
 
     @Test
     public void testRecoveryWithShortCheckSum() throws Exception
     {
-        testRecovery(new byte[0], new byte[6]);
+        testRecovery(new byte[6]);
     }
 
     @Test
@@ -83,7 +62,7 @@ public class CommitLogTest extends Clean
     {
         byte[] garbage = new byte[100];
         (new java.util.Random()).nextBytes(garbage);
-        testRecovery(new byte[0], garbage);
+        testRecovery(garbage);
     }
 
     @Test
@@ -108,30 +87,6 @@ public class CommitLogTest extends Clean
         testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
     }
 
-    @Test
-    public void testRecoveryWithHeaderPositionGreaterThanLogLength() throws 
Exception
-    {
-        // Note: this can actually happen (in periodic mode) when data is 
flushed
-        // before it had time to hit the commitlog (since the header is 
flushed by the system)
-        // see https://issues.apache.org/jira/browse/CASSANDRA-2285
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(out);
-        Checksum checksum = new CRC32();
-
-        // write the first checksum after the fixed-size part, so we won't 
read garbage lastFlushedAt data.
-        dos.writeInt(1);
-        checksum.update(1);
-        dos.writeLong(checksum.getValue());
-        dos.writeInt(0);
-        checksum.update(0);
-        dos.writeInt(200);
-        checksum.update(200);
-        dos.writeLong(checksum.getValue());
-        dos.close();
-
-        testRecovery(out.toByteArray(), new byte[0]);
-    }
-
     protected void testRecoveryWithBadSizeArgument(int size, int dataSize) 
throws Exception
     {
         Checksum checksum = new CRC32();
@@ -147,29 +102,22 @@ public class CommitLogTest extends Clean
         dout.writeLong(checksum);
         dout.write(new byte[dataSize]);
         dout.close();
-        testRecovery(new byte[0], out.toByteArray());
+        testRecovery(out.toByteArray());
     }
 
-    protected Pair<File, File> tmpFiles() throws IOException
+    protected File tmpFile() throws IOException
     {
         File logFile = 
File.createTempFile("testRecoveryWithPartiallyWrittenHeaderTestFile", null);
-        File headerFile = new 
File(CommitLogHeader.getHeaderPathFromSegmentPath(logFile.getAbsolutePath()));
         logFile.deleteOnExit();
-        headerFile.deleteOnExit();
         assert logFile.length() == 0;
-        assert headerFile.length() == 0;
-        return new Pair<File, File>(headerFile, logFile);
+        return logFile;
     }
 
-    protected void testRecovery(byte[] headerData, byte[] logData) throws 
Exception
+    protected void testRecovery(byte[] logData) throws Exception
     {
-        Pair<File, File> tmpFiles = tmpFiles();
-        File logFile = tmpFiles.right;
-        File headerFile = tmpFiles.left;
+        File logFile = tmpFile();
         OutputStream lout = new FileOutputStream(logFile);
-        OutputStream hout = new FileOutputStream(headerFile);
         lout.write(logData);
-        hout.write(headerData);
         //statics make it annoying to test things correctly
         CommitLog.recover(new File[] {logFile}); //CASSANDRA-1119 / 
CASSANDRA-1179 throw on failure*/
     }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsPurgeTest.java 
Mon May  9 07:05:55 2011
@@ -223,7 +223,7 @@ public class CompactionsPurgeTest extend
 
         // Check that the second insert did went in
         ColumnFamily cf = 
cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
-        assert cf.getColumnCount() == 10;
+        assertEquals(10, cf.getColumnCount());
         for (IColumn c : cf)
             assert !c.isMarkedForDelete();
     }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=1100900&r1=1100899&r2=1100900&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java 
Mon May  9 07:05:55 2011
@@ -59,17 +59,12 @@ public class RecoveryManager2Test extend
         logger.debug("forcing flush");
         cfs.forceBlockingFlush();
 
-        // remove Standard1 SSTable/MemTables
-        cfs.clearUnsafe();
-
         logger.debug("begin manual replay");
-        // replay the commit log (nothing should be replayed since everything 
was flushed)
+        // replay the commit log (nothing on Standard1 should be replayed 
since everything was flushed, so only the row on Standard2
+        // will be replayed)
         CommitLog.instance.resetUnsafe();
-        CommitLog.recover();
-
-        // since everything that was flushed was removed (i.e. clearUnsafe)
-        // and the commit shouldn't have replayed anything, there should be no 
data
-        assert Util.getRangeSlice(cfs).isEmpty();
+        int replayed = CommitLog.recover();
+        assert replayed == 1 : "Expecting only 1 replayed mutation, got " + 
replayed;
     }
 
     private void insertRow(String cfname, String key) throws IOException


Reply via email to