Author: brandonwilliams Date: Fri Jul 9 20:27:30 2010 New Revision: 962683
URL: http://svn.apache.org/viewvc?rev=962683&view=rev Log: Hinted handoff improvements; patch by brandonwilliams reviewed by jbellis for CASSANDRA-1223 Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=962683&r1=962682&r2=962683&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jul 9 20:27:30 2010 @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; +import org.apache.cassandra.db.IClock; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,12 +122,12 @@ public class HintedHandOffManager return true; } - private static void deleteHintKey(byte[] endpointAddress, byte[] key, byte[] tableCF) throws IOException + private static void deleteHintKey(byte[] endpointAddress, byte[] key, byte[] tableCF, IClock clock) throws IOException { RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress); - rm.delete(new QueryPath(HINTS_CF, key, tableCF), new TimestampClock(System.currentTimeMillis())); + rm.delete(new QueryPath(HINTS_CF, key, tableCF), clock); rm.apply(); - } + } public static void deleteHintsForEndPoint(InetAddress endpoint) { @@ -185,30 +186,36 @@ public class HintedHandOffManager int rowsReplayed = 0; ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF); byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY; - while (true) - { - QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE); - ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE); - if (pagingFinished(hintColumnFamily, startColumn)) - break; - Collection<IColumn> keyColumns = hintColumnFamily.getSortedColumns(); - for (IColumn keyColumn : keyColumns) + delivery: + while (true) { - startColumn = keyColumn.name(); - Collection<IColumn> tableCFs = keyColumn.getSubColumns(); - for (IColumn tableCF : tableCFs) + QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE); + ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), Integer.MAX_VALUE); + if (pagingFinished(hintColumnFamily, startColumn)) + break; + Collection<IColumn> keyColumns = hintColumnFamily.getSortedColumns(); + for (IColumn keyColumn : keyColumns) { - String[] parts = getTableAndCFNames(tableCF.name()); - if (sendMessage(endpoint, parts[0], parts[1], keyColumn.name())) + startColumn = keyColumn.name(); + Collection<IColumn> tableCFs = keyColumn.getSubColumns(); + for (IColumn tableCF : tableCFs) { - deleteHintKey(endpoint.getAddress(), keyColumn.name(), tableCF.name()); - rowsReplayed++; - } + String[] parts = getTableAndCFNames(tableCF.name()); + if (sendMessage(endpoint, parts[0], parts[1], keyColumn.name())) + { + deleteHintKey(endpoint.getAddress(), keyColumn.name(), tableCF.name(), tableCF.clock()); + rowsReplayed++; + } + else + { + logger_.info("Could not complete hinted handoff to " + endpoint); + break delivery; + } - startColumn = keyColumn.name(); + startColumn = keyColumn.name(); + } } } - } if (rowsReplayed > 0) { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=962683&r1=962682&r2=962683&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Fri Jul 9 20:27:30 2010 @@ -103,7 +103,7 @@ public class RowMutation { byte[] combined = HintedHandOffManager.makeCombinedName(rm.getTable(), cf.metadata().cfName); QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, rm.key(), combined); - add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis())); + add(path, ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis()), DatabaseDescriptor.getGcGraceInSeconds()); } }