Updated Branches: refs/heads/trunk a4288fdc5 -> 488122136
Make hint delivery asynchronous patch by Alexey Zotov; reviewed by jbellis for CASSANDRA-4761 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48812213 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48812213 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48812213 Branch: refs/heads/trunk Commit: 4881221363f984ab6610756cab38e1a016b79e15 Parents: a4288fd Author: Jonathan Ellis <[email protected]> Authored: Thu Oct 11 16:12:21 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Oct 11 16:12:25 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/HintedHandOffManager.java | 68 +++++++++------ .../cassandra/service/WriteResponseHandler.java | 7 ++ test/unit/org/apache/cassandra/Util.java | 5 + test/unit/org/apache/cassandra/db/TableTest.java | 38 ++++++++ 5 files changed, 91 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 15bebf3..94f65ad 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-beta2 + * Make hint delivery asynchronous (CASSANDRA-4761) * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610) * cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661) * Add ability to use custom TServerFactory implementations (CASSANDRA-4608) http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index bc503ea..2d88ecc 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -25,6 +25,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -46,12 +47,12 @@ import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; @@ -125,13 +126,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES); } - private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws WriteTimeoutException - { - AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH); - MessagingService.instance().sendRR(message, endpoint, responseHandler); - responseHandler.get(); - } - private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp) throws IOException { RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes); @@ -287,10 +281,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean // find the hints for the node using its token. UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint); logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint); - ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId)); + final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId)); DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes); - int rowsReplayed = 0; + final AtomicInteger rowsReplayed = new AtomicInteger(0); ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; int pageSize = PAGE_SIZE; @@ -307,15 +301,28 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB(); RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024); - delivery: while (true) { QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(SystemTable.HINTS_CF), startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize); ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter), (int)(System.currentTimeMillis() / 1000)); if (pagingFinished(hintsPage, startColumn)) - break; + { + if (ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(startColumn)) + { + // we've started from the beginning and could not find anything (only maybe tombstones) + break; + } + else + { + // restart query from the first column until we read an empty row; + // that will tell us everything was delivered successfully with no timeouts + startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER; + continue; + } + + } - for (IColumn hint : hintsPage.getSortedColumns()) + for (final IColumn hint : hintsPage.getSortedColumns()) { // Skip tombstones: // if we iterate quickly enough, it's possible that we could request a new page in the same millisecond @@ -338,29 +345,33 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean catch (UnknownColumnFamilyException e) { logger.debug("Skipping delivery of hint for deleted columnfamily", e); - rm = null; + deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); + continue; } - try + MessageOut<RowMutation> message = rm.createMessage(); + rateLimiter.acquire(message.serializedSize(MessagingService.current_version)); + WrappedRunnable callback = new WrappedRunnable() { - if (rm != null) + public void runMayThrow() throws IOException { - MessageOut<RowMutation> message = rm.createMessage(); - rateLimiter.acquire(message.serializedSize(MessagingService.current_version)); - sendMutation(endpoint, message); - rowsReplayed++; + rowsReplayed.incrementAndGet(); + deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); } - deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp()); - } - catch (WriteTimeoutException e) - { - logger.info(String.format("Timed out replaying hints to %s; aborting further deliveries", endpoint)); - break delivery; - } + }; + IAsyncCallback responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH, callback); + MessagingService.instance().sendRR(message, endpoint, responseHandler); + } + + // check if node is still alive and we should continue delivery process + if (!FailureDetector.instance.isAlive(endpoint)) + { + logger.debug("Endpoint {} died during hint delivery, aborting", endpoint); + return; } } - if (rowsReplayed > 0) + if (rowsReplayed.get() > 0) { try { @@ -489,4 +500,5 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean } return rows; } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 707a583..ab7223a 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -55,6 +55,13 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler responses = new AtomicInteger(blockFor); } + public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback) + { + super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, callback, writeType); + blockFor = 1; + responses = new AtomicInteger(1); + } + public WriteResponseHandler(InetAddress endpoint, WriteType writeType) { super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL, null, writeType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 2ce7ca8..1e9031e 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -80,6 +80,11 @@ public class Util return new Column(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp); } + public static Column expiringColumn(String name, String value, long timestamp, int ttl) + { + return new ExpiringColumn(ByteBufferUtil.bytes(name), ByteBufferUtil.bytes(value), timestamp, ttl); + } + public static Column counterColumn(String name, long value, long timestamp) { return new CounterUpdateColumn(ByteBufferUtil.bytes(name), value, timestamp); http://git-wip-us.apache.org/repos/asf/cassandra/blob/48812213/test/unit/org/apache/cassandra/db/TableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/TableTest.java b/test/unit/org/apache/cassandra/db/TableTest.java index b287744..4244e1c 100644 --- a/test/unit/org/apache/cassandra/db/TableTest.java +++ b/test/unit/org/apache/cassandra/db/TableTest.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.utils.WrappedRunnable; import static org.apache.cassandra.Util.column; +import static org.apache.cassandra.Util.expiringColumn; import static org.apache.cassandra.Util.getBytes; import org.apache.cassandra.Util; import org.apache.cassandra.db.filter.QueryPath; @@ -325,6 +326,43 @@ public class TableTest extends SchemaLoader } @Test + public void testGetSliceWithExpiration() throws Throwable + { + // tests slicing against data from one row with expiring column in a memtable and then flushed to an sstable + final Table table = Table.open("Keyspace1"); + final ColumnFamilyStore cfStore = table.getColumnFamilyStore("Standard1"); + final DecoratedKey ROW = Util.dk("row5"); + + RowMutation rm = new RowMutation("Keyspace1", ROW.key); + ColumnFamily cf = ColumnFamily.create("Keyspace1", "Standard1"); + cf.addColumn(column("col1", "val1", 1L)); + cf.addColumn(expiringColumn("col2", "val2", 1L, 1)); + cf.addColumn(column("col3", "val3", 1L)); + + rm.add(cf); + rm.apply(); + cfStore.forceBlockingFlush(); + + Runnable verify = new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + ColumnFamily cf; + + cf = cfStore.getColumnFamily(ROW, new QueryPath("Standard1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 2); + assertColumns(cf, "col1", "col2"); + assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE), "col1"); + + cf = cfStore.getColumnFamily(ROW, new QueryPath("Standard1"), ByteBufferUtil.bytes("col2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1); + assertColumns(cf, "col2"); + assertColumns(ColumnFamilyStore.removeDeleted(cf, Integer.MAX_VALUE)); + } + }; + + reTest(table.getColumnFamilyStore("Standard1"), verify); + } + + @Test public void testGetSliceFromAdvanced() throws Throwable { // tests slicing against data from one row spread across two sstables
