Author: jbellis
Date: Thu Jun 18 20:22:14 2009
New Revision: 786244

URL: http://svn.apache.org/viewvc?rev=786244&view=rev
Log:
update comments, perform some renames, r/m unused code.
patch by jbellis; reviewed by goffinet for CASSANDRA-237

Removed:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogEntry.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=786244&r1=786243&r2=786244&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java 
Thu Jun 18 20:22:14 2009
@@ -43,19 +43,26 @@
  * header represented by the abstraction CommitLogHeader. The header
  * contains a bit array and an array of longs and both the arrays are
  * of size, #column families for the Table, the Commit Log represents.
+ *
  * Whenever a ColumnFamily is written to, for the first time its bit flag
  * is set to one in the CommitLogHeader. When it is flushed to disk by the
  * Memtable its corresponding bit in the header is set to zero. This helps
  * track which CommitLogs can be thrown away as a result of Memtable flushes.
- * However if a ColumnFamily is flushed and again written to disk then its
+ * Additionally, when a ColumnFamily is flushed and written to disk, its
  * entry in the array of longs is updated with the offset in the Commit Log
  * file where it was written. This helps speed up recovery since we can seek
  * to these offsets and start processing the commit log.
- * Every Commit Log is rolled over everytime it reaches its threshold in size.
+ *
+ * Every Commit Log is rolled over everytime it reaches its threshold in size;
+ * the new log inherits the "dirty" bits from the old.
+ *
  * Over time there could be a number of commit logs that would be generated.
- * Hovever whenever we flush a column family disk and update its bit flag we
- * take this bit array and bitwise & it with the headers of the other commit
- * logs that are older.
+ * To allow cleaning up non-active commit logs, whenever we flush a column 
family and update its bit flag in
+ * the active CL, we take the dirty bit array and bitwise & it with the 
headers of the older logs.
+ * If the result is 0, then it is safe to remove the older file.  (Since the 
new CL
+ * inherited the old's dirty bitflags, getting a zero for any given bit in the 
anding
+ * means that either the CF was clean in the old CL or it has been flushed 
since the
+ * switch in the new.)
  *
  * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( 
[email protected] )
  */
@@ -66,34 +73,24 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(CommitLog.class);
     private static Map<String, CommitLogHeader> clHeaders_ = new 
HashMap<String, CommitLogHeader>();
-    
+
     public static final class CommitLogContext
     {
         static CommitLogContext NULL = new CommitLogContext(null, -1L);
         /* Commit Log associated with this operation */
-        private String file_;
+        public final String file;
         /* Offset within the Commit Log where this row as added */
-        private long position_;
+        public final long position;
 
         public CommitLogContext(String file, long position)
         {
-            file_ = file;
-            position_ = position;
+            this.file = file;
+            this.position = position;
         }
 
         boolean isValidContext()
         {
-            return (position_ != -1L);
-        }
-
-        String file()
-        {
-            return file_;
-        }
-
-        long position()
-        {
-            return position_;
+            return (position != -1L);
         }
     }
 
@@ -193,13 +190,8 @@
     */
     private void setNextFileName()
     {
-        logFile_ = DatabaseDescriptor.getLogFileLocation() +
-                            System.getProperty("file.separator") +
-                            "CommitLog-" +
-                            table_ +
-                            "-" +
-                            System.currentTimeMillis() +
-                            ".log";
+        logFile_ = DatabaseDescriptor.getLogFileLocation() + 
System.getProperty("file.separator") +
+                   "CommitLog-" + table_ + "-" + System.currentTimeMillis() + 
".log";
     }
 
     /*
@@ -316,17 +308,6 @@
         doRecovery(filesNeeded, header);
     }
 
-    private void printHeader(byte[] header)
-    {
-        StringBuilder sb = new StringBuilder("");
-        for ( byte b : header )
-        {
-            sb.append(b);
-            sb.append(" ");
-        }
-        logger_.debug(sb.toString());
-    }
-
     private void doRecovery(Stack<File> filesNeeded, byte[] header) throws 
IOException
     {
         Table table = Table.open(table_);
@@ -395,19 +376,19 @@
                     }
                     catch ( IOException e )
                     {
-                        logger_.debug( LogUtil.throwableToString(e) );
+                        logger_.error("Unexpected error reading " + 
file.getName() + "; attempting to continue with the next entry", e);
                     }
                 }
                 reader.close();
-                /* apply the rows read */
+                /* apply the rows read -- success will result in the CL file 
being discarded */
                 table.flush(true);
             }
-            catch ( Throwable th )
+            catch (Throwable th)
             {
-                logger_.info( LogUtil.throwableToString(th) );
+                logger_.error("Fatal error reading " + file.getName(), th);
                 /* close the reader and delete this commit log. */
                 reader.close();
-                FileUtils.delete( new File[]{file} );
+                FileUtils.delete(new File[]{ file });
             }
         }
     }
@@ -449,8 +430,7 @@
         long currentPosition = -1L;
         CommitLogContext cLogCtx = null;
         DataOutputBuffer cfBuffer = new DataOutputBuffer();
-        long fileSize = 0L;
-        
+
         try
         {
             /* serialize the row */
@@ -494,11 +474,7 @@
     /*
      * Check if old commit logs can be deleted. However we cannot
      * do this anymore in the Fast Sync mode and hence I think we
-     * should get rid of Fast Sync mode altogether. If there is
-     * a pathological event where few CF's are rarely being updated
-     * then their Memtable never gets flushed.
-     * This will prevent commit logs from being deleted. WE NEED to
-     * fix this using some hueristic and force flushing such Memtables.
+     * should get rid of Fast Sync mode altogether.
      *
      * param @ cLogCtx The commitLog context .
      * param @ id id of the columnFamily being flushed to disk.
@@ -507,14 +483,14 @@
     private void discard(CommitLog.CommitLogContext cLogCtx, int id) throws 
IOException
     {
         /* retrieve the commit log header associated with the file in the 
context */
-        CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file());
+        CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file);
         if(commitLogHeader == null )
         {
-            if( logFile_.equals(cLogCtx.file()) )
+            if( logFile_.equals(cLogCtx.file) )
             {
                 /* this means we are dealing with the current commit log. */
                 commitLogHeader = clHeader_;
-                clHeaders_.put(cLogCtx.file(), clHeader_);
+                clHeaders_.put(cLogCtx.file, clHeader_);
             }
             else
                 return;
@@ -525,7 +501,7 @@
          * flush. Right now this cannot happen since Memtables are flushed on 
a single
          * thread.
         */
-        if ( cLogCtx.position() < commitLogHeader.getPosition(id) )
+        if ( cLogCtx.position < commitLogHeader.getPosition(id) )
             return;
         commitLogHeader.turnOff(id);
         /* Sort the commit logs based on creation time */
@@ -542,7 +518,7 @@
         */
         for(String oldFile : oldFiles)
         {
-            if(oldFile.equals(cLogCtx.file()))
+            if(oldFile.equals(cLogCtx.file))
             {
                 /*
                  * We need to turn on again. This is because we always keep
@@ -550,8 +526,8 @@
                  * commit log needs to be read. When a flush occurs we turn off
                  * perform & operation and then turn on with the new position.
                 */
-                commitLogHeader.turnOn(id, cLogCtx.position());
-                writeCommitLogHeader(cLogCtx.file(), 
commitLogHeader.toByteArray());
+                commitLogHeader.turnOn(id, cLogCtx.position);
+                writeCommitLogHeader(cLogCtx.file, 
commitLogHeader.toByteArray());
                 break;
             }
             else
@@ -577,66 +553,30 @@
         }
     }
 
-    private void checkThresholdAndRollLog()
+    private void checkThresholdAndRollLog() throws IOException
     {
-        try
-        {
-            if (logWriter_.getFileSize() >= SEGMENT_SIZE)
-            {
-                /* Rolls the current log file over to a new one. */
-                setNextFileName();
-                String oldLogFile = logWriter_.getFileName();
-                //history_.add(oldLogFile);
-                logWriter_.close();
-
-                /* point reader/writer to a new commit log file. */
-                // logWriter_ = SequenceFile.writer(logFile_);
-                logWriter_ = CommitLog.createWriter(logFile_);
-                /* squirrel away the old commit log header */
-                clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
-                /*
-                 * We need to zero out positions because the positions in
-                 * the old file do not make sense in the new one.
-                */
-                clHeader_.zeroPositions();
-                writeCommitLogHeader(clHeader_.toByteArray(), false);
-                // Get the list of files in commit log directory if it is 
greater than a certain number
-                // Force flush all the column families that way we ensure that 
a slowly populated column family is not screwing up
-                // by accumulating the commit logs .
-            }
-        }
-        catch (IOException e)
+        if (logWriter_.getFileSize() >= SEGMENT_SIZE)
         {
-            logger_.info(LogUtil.throwableToString(e));
-        }
-    }
-
-    public static void main(String[] args) throws Throwable
-    {
-        LogUtil.init();
-        
-        File logDir = new File(DatabaseDescriptor.getLogFileLocation());
-        File[] files = logDir.listFiles();
-        Arrays.sort( files, new FileUtils.FileComparator() );
+            /* Rolls the current log file over to a new one. */
+            setNextFileName();
+            String oldLogFile = logWriter_.getFileName();
+            //history_.add(oldLogFile);
+            logWriter_.close();
 
-        byte[] bytes = new 
byte[CommitLogHeader.size(Integer.parseInt(args[0]))];
-        for ( File file : files )
-        {
-            CommitLog clog = new CommitLog( file );
-            clog.readCommitLogHeader(file.getAbsolutePath(), bytes);
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(bytes, 0, bytes.length);
-            CommitLogHeader clHeader = 
CommitLogHeader.serializer().deserialize(bufIn);
+            /* point reader/writer to a new commit log file. */
+            // logWriter_ = SequenceFile.writer(logFile_);
+            logWriter_ = CommitLog.createWriter(logFile_);
+            /* squirrel away the old commit log header */
+            clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
             /*
-            StringBuilder sb = new StringBuilder("");
-            for ( byte b : bytes )
-            {
-                sb.append(b);
-                sb.append(" ");
-            }
+             * We need to zero out positions because the positions in
+             * the old file do not make sense in the new one.
             */
-            System.out.println("FILE:" + file);
-            System.out.println(clHeader.toString());
+            clHeader_.zeroPositions();
+            writeCommitLogHeader(clHeader_.toByteArray(), false);
+            // Get the list of files in commit log directory if it is greater 
than a certain number
+            // Force flush all the column families that way we ensure that a 
slowly populated column family is not screwing up
+            // by accumulating the commit logs .
         }
     }
 }
\ No newline at end of file

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=786244&r1=786243&r2=786244&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLogHeader.java 
Thu Jun 18 20:22:14 2009
@@ -19,13 +19,10 @@
 package org.apache.cassandra.db;
 
 import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.*;
 
 /**
  * Author : Avinash Lakshman ( [email protected]) & Prashant Malik ( 
[email protected] )
@@ -34,7 +31,7 @@
 class CommitLogHeader
 {
     private static ICompactSerializer<CommitLogHeader> serializer_;
-    
+
     static
     {
         serializer_ = new CommitLogHeaderSerializer();
@@ -115,13 +112,13 @@
         return true;
     }
     
-    private byte[] header_ = new byte[0];
-    private int[] position_ = new int[0];
+    private byte[] dirty = new byte[0]; // columnfamilies with un-flushed data 
in this CommitLog
+    private int[] lastFlushedAt = new int[0]; // position at which each CF was 
last flushed
     
     CommitLogHeader(int size)
     {
-        header_ = new byte[size];
-        position_ = new int[size];
+        dirty = new byte[size];
+        lastFlushedAt = new int[size];
     }
     
     /*
@@ -129,89 +126,84 @@
      * also builds an index of position to column family
      * Id.
     */
-    CommitLogHeader(byte[] header, int[] position)
+    CommitLogHeader(byte[] dirty, int[] lastFlushedAt)
     {
-        header_ = header;
-        position_ = position;
+        this.dirty = dirty;
+        this.lastFlushedAt = lastFlushedAt;
     }
     
     CommitLogHeader(CommitLogHeader clHeader)
     {
-        header_ = new byte[clHeader.header_.length];
-        System.arraycopy(clHeader.header_, 0, header_, 0, header_.length);
-        position_ = new int[clHeader.position_.length];
-        System.arraycopy(clHeader.position_, 0, position_, 0, 
position_.length);
+        dirty = new byte[clHeader.dirty.length];
+        System.arraycopy(clHeader.dirty, 0, dirty, 0, dirty.length);
+        lastFlushedAt = new int[clHeader.lastFlushedAt.length];
+        System.arraycopy(clHeader.lastFlushedAt, 0, lastFlushedAt, 0, 
lastFlushedAt.length);
     }
     
     byte get(int index)
     {
-        return header_[index];
+        return dirty[index];
     } 
     
     int getPosition(int index)
     {
-        return position_[index];
+        return lastFlushedAt[index];
     }
     
     void turnOn(int index, long position)
     {
-        turnOn(header_, index, position);
+        turnOn(dirty, index, position);
     }
     
     void turnOn(byte[] bytes, int index, long position)
     {
         bytes[index] = (byte)1;
-        position_[index] = (int)position;
+        lastFlushedAt[index] = (int)position;
     }
     
     void turnOff(int index)
     {
-        turnOff(header_, index);
+        turnOff(dirty, index);
     }
     
     void turnOff(byte[] bytes, int index)
     {
         bytes[index] = (byte)0;
-        position_[index] = 0; 
+        lastFlushedAt[index] = 0;
     }
     
     boolean isSafeToDelete() throws IOException
     {
-        return isSafeToDelete(header_);
-    }
-    
-    boolean isSafeToDelete(byte[] bytes) throws IOException
-    {        
-        for ( byte b : bytes )
+        for (byte b : dirty)
         {
-            if ( b == 1 )
+            if (b == 1)
                 return false;
         }
         return true;
     }
-    
+
     byte[] getBitSet()
     {
-        return header_;
+        return dirty;
     }
     
     int[] getPositions()
     {
-        return position_;
+        return lastFlushedAt;
     }
     
     void zeroPositions()
     {
-        int size = position_.length;
-        position_ = new int[size];
+        int size = lastFlushedAt.length;
+        lastFlushedAt = new int[size];
     }
     
     void and (CommitLogHeader commitLogHeader)
     {        
-        byte[] clh2 = commitLogHeader.header_;
-        for ( int i = 0; i < header_.length; ++i )
+        byte[] clh2 = commitLogHeader.dirty;
+        for ( int i = 0; i < dirty.length; ++i )
         {            
-            header_[i] = (byte)(header_[i] & clh2[i]);
+            dirty[i] = (byte)(dirty[i] & clh2[i]);
         }
     }
     
@@ -226,16 +218,16 @@
     public String toString()
     {
         StringBuilder sb = new StringBuilder("");        
-        for ( int i = 0; i < header_.length; ++i )
+        for ( int i = 0; i < dirty.length; ++i )
         {
-            sb.append(header_[i]);
+            sb.append(dirty[i]);
             sb.append(":");
             Table table = Table.open( DatabaseDescriptor.getTables().get(0));
             sb.append(table.getColumnFamilyName(i));
             sb.append(" ");
         }        
         sb.append(" | " );        
-        for ( int position : position_ )
+        for ( int position : lastFlushedAt)
         {
             sb.append(position);
             sb.append(" ");


Reply via email to