Author: jbellis
Date: Wed Sep 30 19:39:57 2009
New Revision: 820417

URL: http://svn.apache.org/viewvc?rev=820417&view=rev
Log:
formatting + cleanup.  patch by jbellis for CASSANDRA-462

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Wed 
Sep 30 19:39:57 2009
@@ -23,6 +23,7 @@
 import java.security.MessageDigest;
 import java.io.IOException;
 
+import org.apache.log4j.Logger;
 import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -37,6 +38,8 @@
 
 public final class Column implements IColumn
 {
+    private static Logger logger_ = Logger.getLogger(Column.class);
+
     private static ColumnSerializer serializer_ = new ColumnSerializer();
 
     public static ColumnSerializer serializer()

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java 
Wed Sep 30 19:39:57 2009
@@ -115,20 +115,12 @@
         boolean isDigest = dis.readBoolean();
         
         Row row = null;
-        if ( !isDigest )
+        if (!isDigest)
         {
             row = Row.serializer().deserialize(dis);
         }
-               
-               ReadResponse rmsg = null;
-       if( isDigest  )
-        {
-               rmsg =  new ReadResponse(digest);
-        }
-       else
-        {
-               rmsg =  new ReadResponse(row);
-        }
+
+        ReadResponse rmsg = isDigest ? new ReadResponse(digest) : new 
ReadResponse(row);
         rmsg.setIsDigestQuery(isDigest);
        return rmsg;
     } 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java 
Wed Sep 30 19:39:57 2009
@@ -75,9 +75,8 @@
             }
             ReadCommand readCommand = 
ReadCommand.serializer().deserialize(readCtx.bufIn_);
             Table table = Table.open(readCommand.table);
-            Row row = null;
-            row = readCommand.getRow(table);
-            ReadResponse readResponse = null;
+            Row row = readCommand.getRow(table);
+            ReadResponse readResponse;
             if (readCommand.isDigestQuery())
             {
                 readResponse = new ReadResponse(row.digest());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Wed Sep 
30 19:39:57 2009
@@ -29,13 +29,11 @@
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.utils.FBUtilities;
 
 public class Row
 {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 Wed Sep 30 19:39:57 2009
@@ -23,7 +23,6 @@
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
@@ -129,21 +128,17 @@
                
                public void callMe(String key, String value)
                {
-                       handleResponses();
-               }
-               
-               private void handleResponses()
-               {
-                       try
+            try
                        {
                                readResponseResolver_.resolve(new 
ArrayList<Message>(responses_));
-                       }
-                       catch ( DigestMismatchException ex )
-                       {
-                               throw new RuntimeException(ex);
-                       }
-               }
-       }
+            }
+            catch (Exception ex)
+            {
+                throw new RuntimeException(ex);
+            }
+        }
+
+    }
 
        private static long scheduledTimeMillis_ = 600;
        private static ICachetable<String, String> readRepairTable_ = new 
Cachetable<String, String>(scheduledTimeMillis_);

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/IResponseResolver.java
 Wed Sep 30 19:39:57 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service;
 
 import java.util.List;
+import java.io.IOException;
 
 import org.apache.cassandra.net.Message;
 
@@ -32,7 +33,7 @@
         * repairs . Hence you need to derive a response resolver based on your
         * needs from this interface.
         */
-       public T resolve(List<Message> responses) throws 
DigestMismatchException;
+       public T resolve(List<Message> responses) throws 
DigestMismatchException, IOException;
        public boolean isDataPresent(List<Message> responses);
 
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
 Wed Sep 30 19:39:57 2009
@@ -24,6 +24,7 @@
 import java.util.concurrent.locks.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.io.IOException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -55,64 +56,67 @@
         startTime_ = System.currentTimeMillis();
     }
     
-    public T get() throws TimeoutException, DigestMismatchException
+    public T get() throws TimeoutException, DigestMismatchException, 
IOException
     {
-       lock_.lock();
+        lock_.lock();
         try
-        {            
-            boolean bVal = true;            
+        {
+            boolean bVal = true;
             try
             {
-               if ( !done_.get() )
+                if (!done_.get())
                 {
                     long timeout = System.currentTimeMillis() - startTime_ + 
DatabaseDescriptor.getRpcTimeout();
-                    if(timeout > 0)
+                    if (timeout > 0)
+                    {
                         bVal = condition_.await(timeout, 
TimeUnit.MILLISECONDS);
+                    }
                     else
+                    {
                         bVal = false;
+                    }
                 }
             }
-            catch ( InterruptedException ex )
+            catch (InterruptedException ex)
             {
-                if (logger_.isDebugEnabled())
-                  logger_.debug( LogUtil.throwableToString(ex) );
+                throw new AssertionError(ex);
             }
-            
-            if ( !bVal && !done_.get() )
+
+            if (!bVal && !done_.get())
             {
                 StringBuilder sb = new StringBuilder("");
-                for ( Message message : responses_ )
+                for (Message message : responses_)
                 {
-                    sb.append(message.getFrom());                    
-                }                
-                throw new TimeoutException("Operation timed out - received 
only " +  responses_.size() + " responses from " + sb.toString() + " .");
+                    sb.append(message.getFrom());
+                }
+                throw new TimeoutException("Operation timed out - received 
only " + responses_.size() + " responses from " + sb.toString() + " .");
             }
         }
         finally
         {
             lock_.unlock();
-            for(Message response : responses_)
+            for (Message response : responses_)
             {
-               MessagingService.removeRegisteredCallback( 
response.getMessageId() );
+                
MessagingService.removeRegisteredCallback(response.getMessageId());
             }
         }
 
-       return responseResolver_.resolve( responses_);
+        return responseResolver_.resolve(responses_);
     }
     
     public void response(Message message)
     {
         lock_.lock();
         try
-        {            
-            if ( !done_.get() )
+        {
+            if (!done_.get())
             {
-               responses_.add( message );
-               if ( responses_.size() >= responseCount_ && 
responseResolver_.isDataPresent(responses_))
-               {
-                       done_.set(true);
-                       condition_.signal();                    
-               }
+                responses_.add(message);
+                if (responses_.size() >= responseCount_ && 
responseResolver_.isDataPresent(responses_))
+                {
+                    done_.set(true);
+                    condition_.signal();
+                }
             }
         }
         finally

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
 Wed Sep 30 19:39:57 2009
@@ -54,8 +54,8 @@
         * repair request should be scheduled.
         * 
         */
-       public Row resolve(List<Message> responses) throws 
DigestMismatchException
-       {
+       public Row resolve(List<Message> responses) throws 
DigestMismatchException, IOException
+    {
         long startTime = System.currentTimeMillis();
                Row retRow = null;
                List<Row> rowList = new ArrayList<Row>();
@@ -76,38 +76,31 @@
                {                                                   
             byte[] body = response.getMessageBody();
             bufIn.reset(body, body.length);
-            try
+            long start = System.currentTimeMillis();
+            ReadResponse result = ReadResponse.serializer().deserialize(bufIn);
+            if (logger_.isDebugEnabled())
+              logger_.debug( "Response deserialization time : " + 
(System.currentTimeMillis() - start) + " ms.");
+            if (result.isDigestQuery())
             {
-                long start = System.currentTimeMillis();
-                ReadResponse result = 
ReadResponse.serializer().deserialize(bufIn);
-                if (logger_.isDebugEnabled())
-                  logger_.debug( "Response deserialization time : " + 
(System.currentTimeMillis() - start) + " ms.");
-                       if(!result.isDigestQuery())
-                       {
-                               rowList.add(result.row());
-                               endPoints.add(response.getFrom());
-                               key = result.row().key();
-                               table = result.row().getTable();
-                       }
-                       else
-                       {
-                               digest = result.digest();
-                               isDigestQuery = true;
-                       }
+                digest = result.digest();
+                isDigestQuery = true;
             }
-            catch( IOException ex )
+            else
             {
-                logger_.info(LogUtil.throwableToString(ex));
+                rowList.add(result.row());
+                endPoints.add(response.getFrom());
+                key = result.row().key();
+                table = result.row().getTable();
             }
-               }
+        }
                // If there was a digest query compare it with all the data 
digests 
                // If there is a mismatch then throw an exception so that read 
repair can happen.
-               if(isDigestQuery)
-               {
-                       for(Row row: rowList)
-                       {
-                               if( !Arrays.equals(row.digest(), digest) )
-                               {
+        if (isDigestQuery)
+        {
+            for (Row row : rowList)
+            {
+                if (!Arrays.equals(row.digest(), digest))
+                {
                     /* Wrap the key as the context in this exception */
                                        throw new 
DigestMismatchException(row.key());
                                }
@@ -115,36 +108,36 @@
                }
                
         /* If the rowList is empty then we had some exception above. */
-        if ( rowList.size() == 0 )
+        if (rowList.size() == 0)
         {
             return retRow;
         }
-        
+
         /* Now calculate the resolved row */
-               retRow = new Row(table, key);
-               for (int i = 0 ; i < rowList.size(); i++)
-               {
-                       retRow.repair(rowList.get(i));                  
-               }
+        retRow = new Row(table, key);
+        for (int i = 0; i < rowList.size(); i++)
+        {
+            retRow.repair(rowList.get(i));
+        }
 
         // At  this point  we have the return row .
-               // Now we need to calculate the difference 
-               // so that we can schedule read repairs 
-               for (int i = 0 ; i < rowList.size(); i++)
-               {
-                       // since retRow is the resolved row it can be used as 
the super set
-                       Row diffRow = rowList.get(i).diff(retRow);
-                       if(diffRow == null) // no repair needs to happen
-                               continue;
-                       // create the row mutation message based on the diff 
and schedule a read repair 
-                       RowMutation rowMutation = new RowMutation(table, key);  
                                
-               for (ColumnFamily cf : diffRow.getColumnFamilies())
-               {
-                   rowMutation.add(cf);
-               }
+        // Now we need to calculate the difference
+        // so that we can schedule read repairs
+        for (int i = 0; i < rowList.size(); i++)
+        {
+            // since retRow is the resolved row it can be used as the super set
+            Row diffRow = rowList.get(i).diff(retRow);
+            if (diffRow == null) // no repair needs to happen
+                continue;
+            // create the row mutation message based on the diff and schedule 
a read repair
+            RowMutation rowMutation = new RowMutation(table, key);
+            for (ColumnFamily cf : diffRow.getColumnFamilies())
+            {
+                rowMutation.add(cf);
+            }
             RowMutationMessage rowMutationMessage = new 
RowMutationMessage(rowMutation);
-               
ReadRepairManager.instance().schedule(endPoints.get(i),rowMutationMessage);
-               }
+            ReadRepairManager.instance().schedule(endPoints.get(i), 
rowMutationMessage);
+        }
         if (logger_.isDebugEnabled())
             logger_.debug("resolve: " + (System.currentTimeMillis() - 
startTime) + " ms.");
                return retRow;
@@ -152,26 +145,26 @@
 
        public boolean isDataPresent(List<Message> responses)
        {
-               boolean isDataPresent = false;
-               for (Message response : responses)
-               {
+        boolean isDataPresent = false;
+        for (Message response : responses)
+        {
             byte[] body = response.getMessageBody();
-                       DataInputBuffer bufIn = new DataInputBuffer();
+            DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
             try
             {
-                       ReadResponse result = 
ReadResponse.serializer().deserialize(bufIn);
-                       if(!result.isDigestQuery())
-                       {
-                               isDataPresent = true;
-                       }
+                ReadResponse result = 
ReadResponse.serializer().deserialize(bufIn);
+                if (!result.isDigestQuery())
+                {
+                    isDataPresent = true;
+                }
                 bufIn.close();
             }
-            catch(IOException ex)
+            catch (IOException ex)
             {
                 logger_.info(LogUtil.throwableToString(ex));
-            }                        
-               }
-               return isDataPresent;
-       }
+            }
+        }
+        return isDataPresent;
+    }
 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Wed Sep 30 19:39:57 2009
@@ -352,10 +352,7 @@
             Message message = command.makeReadMessage();
             Message messageDigestOnly = 
readMessageDigestOnly.makeReadMessage();
 
-            IResponseResolver<Row> readResponseResolver = new 
ReadResponseResolver();
-            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(
-                    DatabaseDescriptor.getQuorum(),
-                    readResponseResolver);
+            QuorumResponseHandler<Row> quorumResponseHandler = new 
QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new 
ReadResponseResolver());
             EndPoint dataPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
             List<EndPoint> endpointList = new 
ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
             /* Remove the local storage endpoint from the list. */
@@ -401,7 +398,7 @@
             }
             catch (DigestMismatchException ex)
             {
-                if ( DatabaseDescriptor.getConsistencyCheck())
+                if (DatabaseDescriptor.getConsistencyCheck())
                 {
                     IResponseResolver<Row> readResponseResolverRepair = new 
ReadResponseResolver();
                     QuorumResponseHandler<Row> quorumResponseHandlerRepair = 
new QuorumResponseHandler<Row>(
@@ -409,8 +406,7 @@
                             readResponseResolverRepair);
                     logger.info("DigestMismatchException: " + command.key);
                     Message messageRepair = command.makeReadMessage();
-                    
MessagingService.getMessagingInstance().sendRR(messageRepair, 
commandEndPoints.get(commandIndex),
-                            quorumResponseHandlerRepair);
+                    
MessagingService.getMessagingInstance().sendRR(messageRepair, 
commandEndPoints.get(commandIndex), quorumResponseHandlerRepair);
                     try
                     {
                         row = quorumResponseHandlerRepair.get();

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=820417&r1=820416&r2=820417&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java 
Wed Sep 30 19:39:57 2009
@@ -398,15 +398,12 @@
         return bytes;
     }
 
-    public static String bytesToHex(byte[] buf)
+    public static String bytesToHex(byte[] bytes)
     {
-        char[] chars = new char[2*buf.length];
-        for (int i = 0; i < buf.length; i++)
-        {
-            chars[i*2] = HEX_CHARS[(buf[i] & 0xF0) >>> 4];
-            chars[i*2+1] = HEX_CHARS[buf[i] & 0x0F];
-        }
-        return new String(chars);
+        StringBuilder sb = new StringBuilder();
+        for (byte b : bytes)
+            sb.append(Integer.toHexString(b & 0xff));
+        return sb.toString();
     }
 
     public static String mapToString(Map<?,?> map)


Reply via email to