Author: jbellis
Date: Thu May  7 17:13:05 2009
New Revision: 772713

URL: http://svn.apache.org/viewvc?rev=772713&view=rev
Log:
clean up read/write path more; include message id in logging so we can trace 
what
happens to an individual op.  patch by jbellis.

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeCommand.java 
Thu May  7 17:13:05 2009
@@ -45,6 +45,16 @@
         dib.reset(bytes, bytes.length);
         return serializer.deserialize(new DataInputStream(dib));
     }
+
+    public String toString()
+    {
+        return "RangeCommand(" +
+               "table='" + table + '\'' +
+               ", startWith='" + startWith + '\'' +
+               ", stopAt='" + stopAt + '\'' +
+               ", maxResults=" + maxResults +
+               ')';
+    }
 }
 
 class RangeCommandSerializer implements ICompactSerializer<RangeCommand>

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java 
Thu May  7 17:13:05 2009
@@ -95,7 +95,7 @@
             System.arraycopy(readCtx.bufOut_.getData(), 0, bytes, 0, 
bytes.length);
 
             Message response = 
message.getReply(StorageService.getLocalStorageEndPoint(), bytes);
-            logger_.debug("Read key " + readCommand.key + "; sending response 
to " + message.getFrom());
+            logger_.debug("Read key " + readCommand.key + "; sending response 
to " + message.getMessageId() + "@" + message.getFrom());
             MessagingService.getMessagingInstance().sendOneWay(response, 
message.getFrom());
 
             /* Do read repair if header of the message says so */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Row.java Thu May 
 7 17:13:05 2009
@@ -167,7 +167,6 @@
 
     public byte[] digest()
     {
-        long start = System.currentTimeMillis();
         Set<String> cfamilies = columnFamilies_.keySet();
         byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
         for (String cFamily : cfamilies)
@@ -181,7 +180,6 @@
                 xorHash = FBUtilities.xor(xorHash, 
columnFamilies_.get(cFamily).digest());
             }
         }
-        logger_.info("DIGEST TIME: " + (System.currentTimeMillis() - start) + 
" ms.");
         return xorHash;
     }
 

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
 Thu May  7 17:13:05 2009
@@ -91,7 +91,7 @@
 
             WriteResponse response = new WriteResponse(rm.table(), rm.key(), 
true);
             Message responseMessage = 
WriteResponse.makeWriteResponseMessage(message, response);
-            logger_.debug("Mutation applied in " + (end - start) + "ms.  
Sending response to " +  message.getFrom() + " for key :" + rm.key());
+            logger_.debug("Mutation applied in " + (end - start) + "ms.  
Sending response to " + message.getMessageId() + "@" + message.getFrom());
             
MessagingService.getMessagingInstance().sendOneWay(responseMessage, 
message.getFrom());
         }
         catch (IOException e)

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
 Thu May  7 17:13:05 2009
@@ -34,7 +34,7 @@
         IAsyncCallback cb = MessagingService.getRegisteredCallback(messageId);
         if ( cb != null )
         {
-            logger_.info("Processing response on a callback from " + 
message.getFrom());
+            logger_.info("Processing response on a callback from " + 
message.getMessageId() + "@" + message.getFrom());
             cb.response(message);
         }
         else
@@ -42,7 +42,7 @@
             IAsyncResult ar = MessagingService.getAsyncResult(messageId);
             if ( ar != null )
             {
-                logger_.info("Processing response on an async result from " + 
message.getFrom());
+                logger_.info("Processing response on an async result from " + 
message.getMessageId() + "@" + message.getFrom());
                 ar.result(message);
             }
         }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
 Thu May  7 17:13:05 2009
@@ -508,7 +508,7 @@
             throw new InvalidRequestException("range queries may only be 
performed against an order-preserving partitioner");
         }
 
-        return StorageProxy.getRange(new RangeCommand(tablename, startWith, 
stopAt, maxResults));
+        return StorageProxy.getKeyRange(new RangeCommand(tablename, startWith, 
stopAt, maxResults));
     }
 
     /*

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
 Thu May  7 17:13:05 2009
@@ -37,6 +37,7 @@
 import org.apache.cassandra.utils.ICachetable;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
 
 
 class ConsistencyManager implements Runnable
@@ -49,7 +50,6 @@
                
                public void response(Message msg)
                {
-                       logger_.debug("Received reponse : " + msg.toString());
                        responses_.add(msg);
                        if ( responses_.size() == 
ConsistencyManager.this.replicas_.size() )
                                handleDigestResponses();
@@ -91,11 +91,10 @@
             /* Add the local storage endpoint to the replicas_ list */
             replicas_.add(StorageService.getLocalStorageEndPoint());
                        IAsyncCallback responseHandler = new 
DataRepairHandler(ConsistencyManager.this.replicas_.size(), 
readResponseResolver); 
-                       String table = DatabaseDescriptor.getTables().get(0);
             ReadCommand readCommand = constructReadMessage(false);
-                       // ReadMessage readMessage = new ReadMessage(table, 
row_.key(), columnFamily_);
             Message message = readCommand.makeReadMessage();
-                       MessagingService.getMessagingInstance().sendRR(message, 
replicas_.toArray( new EndPoint[0] ), responseHandler);                 
+            logger_.debug("Performing read repair for " + readCommand_.key + " 
to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
+                       MessagingService.getMessagingInstance().sendRR(message, 
replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
                }
        }
        
@@ -140,8 +139,7 @@
                        }
                        catch ( DigestMismatchException ex )
                        {
-                               logger_.info("We should not be coming here 
under any circumstances ...");
-                               logger_.info(LogUtil.throwableToString(ex));
+                               throw new RuntimeException(ex);
                        }
                }
        }
@@ -161,17 +159,16 @@
 
        public void run()
        {
-               logger_.debug(" Run the consistency checks for " + 
readCommand_.getColumnFamilyName());         
         ReadCommand readCommandDigestOnly = constructReadMessage(true);
                try
                {
-                       Message messageDigestOnly = 
readCommandDigestOnly.makeReadMessage();
-                       IAsyncCallback digestResponseHandler = new 
DigestResponseHandler();
-                       
MessagingService.getMessagingInstance().sendRR(messageDigestOnly, 
replicas_.toArray(new EndPoint[replicas_.size()]), digestResponseHandler);
+                       Message message = 
readCommandDigestOnly.makeReadMessage();
+            logger_.debug("Reading consistency digest for " + readCommand_.key 
+ " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") 
+ "]");
+            MessagingService.getMessagingInstance().sendRR(message, 
replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
                }
-               catch ( IOException ex )
+               catch (IOException ex)
                {
-                       logger_.info(LogUtil.throwableToString(ex));
+                       throw new RuntimeException(ex);
                }
        }
     

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
 Thu May  7 17:13:05 2009
@@ -214,7 +214,6 @@
 
     public void onChange(EndPoint endpoint, EndPointState epState)
     {
-        logger_.debug("CHANGE IN STATE FOR @ StorageLoadBalancer " + endpoint);
         // load information for this specified endpoint for load balancing 
         ApplicationState loadInfoState = 
epState.getApplicationState(LoadDisseminator.loadInfo_);
         if ( loadInfoState != null )

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=772713&r1=772712&r2=772713&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
 Thu May  7 17:13:05 2009
@@ -117,15 +117,17 @@
                        Map<EndPoint, EndPoint> endpointMap = 
StorageService.instance().getNStorageEndPointMap(rm.key());
                        // TODO: throw a thrift exception if we do not have N 
nodes
                        Map<EndPoint, Message> messageMap = 
createWriteMessages(rm, endpointMap);
-            logger.debug("insert writing key " + rm.key() + " to [" + 
StringUtils.join(messageMap.keySet(), ", ") + "]");
                        for (Map.Entry<EndPoint, Message> entry : 
messageMap.entrySet())
                        {
-                               
MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), 
entry.getKey());
+                Message message = entry.getValue();
+                EndPoint endpoint = entry.getKey();
+                logger.debug("insert writing key " + rm.key() + " to " + 
message.getMessageId() + "@" + endpoint);
+                MessagingService.getMessagingInstance().sendOneWay(message, 
endpoint);
                        }
                }
         catch (IOException e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException("error inserting key " + rm.key(), e);
         }
         finally
         {
@@ -151,7 +153,7 @@
                     DatabaseDescriptor.getReplicationFactor(),
                     new WriteResponseResolver());
             EndPoint[] endpoints = 
StorageService.instance().getNStorageEndPoint(rm.key());
-            logger.debug("insertBlocking writing key " + rm.key() + " to [" + 
StringUtils.join(endpoints, ", ") + "]");
+            logger.debug("insertBlocking writing key " + rm.key() + " to " + 
message.getMessageId() + "@[" + StringUtils.join(endpoints, ", ") + "]");
             // TODO: throw a thrift exception if we do not have N nodes
 
             MessagingService.getMessagingInstance().sendRR(message, endpoints, 
quorumResponseHandler);
@@ -160,7 +162,7 @@
         }
         catch (Exception e)
         {
-            logger.error(e);
+            logger.error("error writing key " + rm.key(), e);
             throw new UnavailableException();
         }
         finally
@@ -240,8 +242,8 @@
     {
         EndPoint endPoint = 
StorageService.instance().findSuitableEndPoint(command.key);
         assert endPoint != null;
-        logger.debug("weakreadremote reading " + command + " from " + 
endPoint);
         Message message = command.makeReadMessage();
+        logger.debug("weakreadremote reading " + command + " from " + 
message.getMessageId() + "@" + endPoint);
         message.addHeader(ReadCommand.DO_REPAIR, 
ReadCommand.DO_REPAIR.getBytes());
         IAsyncResult iar = 
MessagingService.getMessagingInstance().sendRR(message, endPoint);
         byte[] body;
@@ -251,8 +253,8 @@
         }
         catch (TimeoutException e)
         {
-            throw new RuntimeException(e);
-            // TODO retry to a different endpoint
+            throw new RuntimeException("error reading key " + command.key, e);
+            // TODO retry to a different endpoint?
         }
         DataInputBuffer bufIn = new DataInputBuffer();
         bufIn.reset(body, body.length);
@@ -329,7 +331,7 @@
         }
         catch (IOException ex)
         {
-            throw new RuntimeException(ex);
+            throw new RuntimeException("error touching key " + key, ex);
         }
         finally
         {
@@ -461,12 +463,14 @@
         */
         endPoints[0] = dataPoint;
         messages[0] = message;
+        logger.debug("strongread reading data for " + command + " from " + 
message.getMessageId() + "@" + dataPoint);
         for (int i = 1; i < endPoints.length; i++)
         {
-            endPoints[i] = endpointList.get(i - 1);
+            EndPoint digestPoint = endpointList.get(i - 1);
+            endPoints[i] = digestPoint;
             messages[i] = messageDigestOnly;
+            logger.debug("strongread reading digest for " + command + " from " 
+ messageDigestOnly.getMessageId() + "@" + digestPoint);
         }
-        logger.debug("strongread reading " + command + " from " + 
StringUtils.join(endPoints, ", "));
 
         try
         {
@@ -495,7 +499,7 @@
                 catch (DigestMismatchException e)
                 {
                     // TODO should this be a thrift exception?
-                    throw new RuntimeException(e);
+                    throw new RuntimeException("digest mismatch reading key " 
+ command.key, e);
                 }
             }
         }
@@ -592,7 +596,7 @@
         }
         catch (TimeoutException e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException("timeout reading keys " + 
StringUtils.join(rows.keySet(), ", "), e);
         }
         return rows;
     }
@@ -660,7 +664,7 @@
         return row;
     }
 
-    static List<String> getRange(RangeCommand command)
+    static List<String> getKeyRange(RangeCommand command)
     {
         long startTime = System.currentTimeMillis();
         try
@@ -675,7 +679,7 @@
         }
         catch (Exception e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException("error reading keyrange " + command, e);
         }
         finally
         {


Reply via email to