Author: jbellis
Date: Tue May  5 17:03:15 2009
New Revision: 771934

URL: http://svn.apache.org/viewvc?rev=771934&view=rev
Log:
 A make hint generation include a real timestamp so we can do meaningful deletes
 B call removeDeleted on the data we read locally to purge tombstones
 C because of (B) any supercolumn w/o subcolumns simply won't exist so we know 
we can skip re-deleting the endpoint data. so deleteKey becomes 
deleteHintedData.
 D because deleted data is not immediately purged, increased the scheduled 
interval from 20min to 60 to reduce the load of scanning the hint CF.

patch by Jun Rao and jbellis for CASSANDRA-34

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Tue May  5 17:03:15 2009
@@ -641,7 +641,7 @@
                 }
             }
             else if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= 
gcBefore)
-                     || c.timestamp() < cf.getMarkedForDeleteAt())
+                     || c.timestamp() <= cf.getMarkedForDeleteAt())
             {
                 cf.remove(cname);
             }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
 Tue May  5 17:03:15 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -53,7 +54,7 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = 
Logger.getLogger(HintedHandOffManager.class);
     public static final String key_ = "HintedHandOffKey";
-    final static long intervalInMins_ = 20;
+    final static long intervalInMins_ = 60;
     private ScheduledExecutorService executor_ = new 
DebuggableScheduledThreadPoolExecutor(1, new 
ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
 
 
@@ -85,26 +86,61 @@
 
         Table table = Table.open(DatabaseDescriptor.getTables().get(0));
         Row row = table.get(key);
-        RowMutation rm = new 
RowMutation(DatabaseDescriptor.getTables().get(0), row);
+        Row purgedRow = new Row(key);
+        for (ColumnFamily cf : row.getColumnFamilies())
+        {
+            purgedRow.addColumnFamily(ColumnFamilyStore.removeDeleted(cf));
+        }
+        RowMutation rm = new 
RowMutation(DatabaseDescriptor.getTables().get(0), purgedRow);
         Message message = rm.makeRowMutationMessage();
-
         QuorumResponseHandler<Boolean> quorumResponseHandler = new 
QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
         MessagingService.getMessagingInstance().sendRR(message, new 
EndPoint[]{ endPoint }, quorumResponseHandler);
+
         return quorumResponseHandler.get();
     }
 
-    private static void deleteEndPoint(String endpointAddress, String key) 
throws Exception
+    private static void deleteEndPoint(String endpointAddress, String key, 
long timestamp) throws Exception
     {
         RowMutation rm = new 
RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, 
System.currentTimeMillis());
+        rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, timestamp);
         rm.apply();
     }
 
-    private static void deleteKey(String key) throws Exception
+    private static void deleteHintedData(String key) throws Exception
     {
-        // delete the hint record
+        // delete the row from Application CFs: find the largest timestamp in 
any of
+        // the data columns, and delete the entire CF with that value for the 
tombstone.
+
+        // Note that we delete all data associated with the key: this may be 
more than
+        // we sent earlier in sendMessage, since HH is not serialized with 
writes.
+        // This is sub-optimal but okay, since HH is just an effort to make a 
recovering
+        // node more consistent than it would have been; we can rely on the 
other
+        // consistency mechanisms to finish the job in this corner case.
         RowMutation rm = new 
RowMutation(DatabaseDescriptor.getTables().get(0), key_);
-        rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
+        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        Row row = table.get(key); // not necessary to do removeDeleted here
+        Collection<ColumnFamily> cfs = row.getColumnFamilies();
+        for (ColumnFamily cf : cfs)
+        {
+            Set<IColumn> columns = cf.getAllColumns();
+            long maxTS = Long.MIN_VALUE;
+            if (!cf.isSuper())
+            {
+                for (IColumn col : columns)
+                    maxTS = Math.max(maxTS, col.timestamp());
+            }
+            else
+            {
+                for (IColumn col : columns)
+                {
+                    maxTS = Math.max(maxTS, col.timestamp());
+                    Collection<IColumn> subColumns = col.getSubColumns();
+                    for (IColumn subCol : subColumns)
+                        maxTS = Math.max(maxTS, subCol.timestamp());
+                }
+            }
+            rm.delete(cf.name(), maxTS);
+        }
         rm.apply();
     }
 
@@ -120,10 +156,9 @@
         // 6. Do major compaction to clean up all deletes etc.
         // 7. I guess we r done
         Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-        ColumnFamily hintColumnFamily = null;
         try
         {
-            hintColumnFamily = table.get(key_, Table.hints_);
+            ColumnFamily hintColumnFamily = 
ColumnFamilyStore.removeDeleted(table.get(key_, Table.hints_), 
Integer.MAX_VALUE);
             if (hintColumnFamily == null)
             {
                 columnFamilyStore.forceFlush();
@@ -138,26 +173,18 @@
             for (IColumn keyColumn : keys)
             {
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                // endpoints could be null if the server were terminated 
during a previous runHints
-                // after deleteEndPoint but before deleteKey.
-                boolean allsuccess = true;
-                if (endpoints != null)
+                int deleted = 0;
+                for (IColumn endpoint : endpoints)
                 {
-                    for (IColumn endpoint : endpoints)
+                    if (sendMessage(endpoint.name(), keyColumn.name()))
                     {
-                        if (sendMessage(endpoint.name(), keyColumn.name()))
-                        {
-                            deleteEndPoint(endpoint.name(), keyColumn.name());
-                        }
-                        else
-                        {
-                            allsuccess = false;
-                        }
+                        deleteEndPoint(endpoint.name(), keyColumn.name(), 
keyColumn.timestamp());
+                        deleted++;
                     }
                 }
-                if (allsuccess)
+                if (deleted == endpoints.size())
                 {
-                    deleteKey(keyColumn.name());
+                    deleteHintedData(keyColumn.name());
                 }
             }
             columnFamilyStore.forceFlush();
@@ -181,10 +208,9 @@
         // 2. For each key read the list of recepients if teh endpoint matches 
send
         // 3. Delete that recepient from the key if write was successful
         Table table = Table.open(DatabaseDescriptor.getTables().get(0));
-        ColumnFamily hintedColumnFamily = null;
         try
         {
-            hintedColumnFamily = table.get(key_, Table.hints_);
+            ColumnFamily hintedColumnFamily = table.get(key_, Table.hints_);
             if (hintedColumnFamily == null)
             {
                 return;
@@ -198,18 +224,14 @@
             for (IColumn keyColumn : keys)
             {
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                if (endpoints == null)
-                {
-                    deleteKey(keyColumn.name());
-                    continue;
-                }
                 for (IColumn endpoint : endpoints)
                 {
-                    if (endpoint.name().equals(endPoint.getHost()))
+                    if (endpoint.name().equals(endPoint.getHost()) && 
sendMessage(endpoint.name(), keyColumn.name()))
                     {
-                        if (sendMessage(endpoint.name(), keyColumn.name()))
+                        deleteEndPoint(endpoint.name(), keyColumn.name(), 
keyColumn.timestamp());
+                        if (endpoints.size() == 1)
                         {
-                            deleteEndPoint(endpoint.name(), keyColumn.name());
+                            deleteHintedData(keyColumn.name());
                         }
                     }
                 }

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java 
Tue May  5 17:03:15 2009
@@ -120,7 +120,7 @@
     void addHints(String hint) throws IOException
     {
         String cfName = Table.hints_ + ":" + hint;
-        add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
+        add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
     }
 
     /*


Reply via email to