Author: jbellis
Date: Tue May 5 17:03:15 2009
New Revision: 771934
URL: http://svn.apache.org/viewvc?rev=771934&view=rev
Log:
A make hint generation include a real timestamp so we can do meaningful deletes
B call removeDeleted on the data we read locally to purge tombstones
C because of (B) any supercolumn w/o subcolumns simply won't exist so we know
we can skip re-deleting the endpoint data. so deleteKey becomes
deleteHintedData.
D because deleted data is not immediately purged, increased the scheduled
interval from 20min to 60 to reduce the load of scanning the hint CF.
patch by Jun Rao and jbellis for CASSANDRA-34
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Tue May 5 17:03:15 2009
@@ -641,7 +641,7 @@
}
}
else if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <=
gcBefore)
- || c.timestamp() < cf.getMarkedForDeleteAt())
+ || c.timestamp() <= cf.getMarkedForDeleteAt())
{
cf.remove(cname);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Tue May 5 17:03:15 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -53,7 +54,7 @@
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ =
Logger.getLogger(HintedHandOffManager.class);
public static final String key_ = "HintedHandOffKey";
- final static long intervalInMins_ = 20;
+ final static long intervalInMins_ = 60;
private ScheduledExecutorService executor_ = new
DebuggableScheduledThreadPoolExecutor(1, new
ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
@@ -85,26 +86,61 @@
Table table = Table.open(DatabaseDescriptor.getTables().get(0));
Row row = table.get(key);
- RowMutation rm = new
RowMutation(DatabaseDescriptor.getTables().get(0), row);
+ Row purgedRow = new Row(key);
+ for (ColumnFamily cf : row.getColumnFamilies())
+ {
+ purgedRow.addColumnFamily(ColumnFamilyStore.removeDeleted(cf));
+ }
+ RowMutation rm = new
RowMutation(DatabaseDescriptor.getTables().get(0), purgedRow);
Message message = rm.makeRowMutationMessage();
-
QuorumResponseHandler<Boolean> quorumResponseHandler = new
QuorumResponseHandler<Boolean>(1, new WriteResponseResolver());
MessagingService.getMessagingInstance().sendRR(message, new
EndPoint[]{ endPoint }, quorumResponseHandler);
+
return quorumResponseHandler.get();
}
- private static void deleteEndPoint(String endpointAddress, String key)
throws Exception
+ private static void deleteEndPoint(String endpointAddress, String key,
long timestamp) throws Exception
{
RowMutation rm = new
RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress,
System.currentTimeMillis());
+ rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, timestamp);
rm.apply();
}
- private static void deleteKey(String key) throws Exception
+ private static void deleteHintedData(String key) throws Exception
{
- // delete the hint record
+ // delete the row from Application CFs: find the largest timestamp in
any of
+ // the data columns, and delete the entire CF with that value for the
tombstone.
+
+ // Note that we delete all data associated with the key: this may be
more than
+ // we sent earlier in sendMessage, since HH is not serialized with
writes.
+ // This is sub-optimal but okay, since HH is just an effort to make a
recovering
+ // node more consistent than it would have been; we can rely on the
other
+ // consistency mechanisms to finish the job in this corner case.
RowMutation rm = new
RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ Row row = table.get(key); // not necessary to do removeDeleted here
+ Collection<ColumnFamily> cfs = row.getColumnFamilies();
+ for (ColumnFamily cf : cfs)
+ {
+ Set<IColumn> columns = cf.getAllColumns();
+ long maxTS = Long.MIN_VALUE;
+ if (!cf.isSuper())
+ {
+ for (IColumn col : columns)
+ maxTS = Math.max(maxTS, col.timestamp());
+ }
+ else
+ {
+ for (IColumn col : columns)
+ {
+ maxTS = Math.max(maxTS, col.timestamp());
+ Collection<IColumn> subColumns = col.getSubColumns();
+ for (IColumn subCol : subColumns)
+ maxTS = Math.max(maxTS, subCol.timestamp());
+ }
+ }
+ rm.delete(cf.name(), maxTS);
+ }
rm.apply();
}
@@ -120,10 +156,9 @@
// 6. Do major compaction to clean up all deletes etc.
// 7. I guess we r done
Table table = Table.open(DatabaseDescriptor.getTables().get(0));
- ColumnFamily hintColumnFamily = null;
try
{
- hintColumnFamily = table.get(key_, Table.hints_);
+ ColumnFamily hintColumnFamily =
ColumnFamilyStore.removeDeleted(table.get(key_, Table.hints_),
Integer.MAX_VALUE);
if (hintColumnFamily == null)
{
columnFamilyStore.forceFlush();
@@ -138,26 +173,18 @@
for (IColumn keyColumn : keys)
{
Collection<IColumn> endpoints = keyColumn.getSubColumns();
- // endpoints could be null if the server were terminated
during a previous runHints
- // after deleteEndPoint but before deleteKey.
- boolean allsuccess = true;
- if (endpoints != null)
+ int deleted = 0;
+ for (IColumn endpoint : endpoints)
{
- for (IColumn endpoint : endpoints)
+ if (sendMessage(endpoint.name(), keyColumn.name()))
{
- if (sendMessage(endpoint.name(), keyColumn.name()))
- {
- deleteEndPoint(endpoint.name(), keyColumn.name());
- }
- else
- {
- allsuccess = false;
- }
+ deleteEndPoint(endpoint.name(), keyColumn.name(),
keyColumn.timestamp());
+ deleted++;
}
}
- if (allsuccess)
+ if (deleted == endpoints.size())
{
- deleteKey(keyColumn.name());
+ deleteHintedData(keyColumn.name());
}
}
columnFamilyStore.forceFlush();
@@ -181,10 +208,9 @@
// 2. For each key read the list of recepients if teh endpoint matches
send
// 3. Delete that recepient from the key if write was successful
Table table = Table.open(DatabaseDescriptor.getTables().get(0));
- ColumnFamily hintedColumnFamily = null;
try
{
- hintedColumnFamily = table.get(key_, Table.hints_);
+ ColumnFamily hintedColumnFamily = table.get(key_, Table.hints_);
if (hintedColumnFamily == null)
{
return;
@@ -198,18 +224,14 @@
for (IColumn keyColumn : keys)
{
Collection<IColumn> endpoints = keyColumn.getSubColumns();
- if (endpoints == null)
- {
- deleteKey(keyColumn.name());
- continue;
- }
for (IColumn endpoint : endpoints)
{
- if (endpoint.name().equals(endPoint.getHost()))
+ if (endpoint.name().equals(endPoint.getHost()) &&
sendMessage(endpoint.name(), keyColumn.name()))
{
- if (sendMessage(endpoint.name(), keyColumn.name()))
+ deleteEndPoint(endpoint.name(), keyColumn.name(),
keyColumn.timestamp());
+ if (endpoints.size() == 1)
{
- deleteEndPoint(endpoint.name(), keyColumn.name());
+ deleteHintedData(keyColumn.name());
}
}
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=771934&r1=771933&r2=771934&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
Tue May 5 17:03:15 2009
@@ -120,7 +120,7 @@
void addHints(String hint) throws IOException
{
String cfName = Table.hints_ + ":" + hint;
- add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
+ add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
}
/*