Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt build.xml debian/changelog src/java/org/apache/cassandra/db/BatchlogManager.java src/java/org/apache/cassandra/db/ColumnFamilyStore.java src/java/org/apache/cassandra/db/HintedHandOffManager.java src/java/org/apache/cassandra/db/SystemKeyspace.java src/java/org/apache/cassandra/service/StorageProxy.java test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66af6fed Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66af6fed Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66af6fed Branch: refs/heads/cassandra-2.1 Commit: 66af6fedc02eed630028043f8a6f0d3014f193d5 Parents: de8a479 384de4b Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Apr 18 03:14:47 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Apr 18 03:14:47 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 11 +- .../apache/cassandra/db/BatchlogManager.java | 102 +++++++++++-------- .../apache/cassandra/db/ColumnFamilyStore.java | 6 -- .../cassandra/db/HintedHandOffManager.java | 19 +--- .../org/apache/cassandra/db/SystemKeyspace.java | 55 +++++++--- .../db/commitlog/CommitLogReplayer.java | 12 +-- .../apache/cassandra/service/StorageProxy.java | 9 +- .../cassandra/db/BatchlogManagerTest.java | 84 +++++++++++++-- .../apache/cassandra/db/HintedHandOffTest.java | 19 ++-- 10 files changed, 214 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 9f34023,ad26f6d..705f1b8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -108,6 -64,6 +108,7 @@@ Merged from 1.2 * Schedule schema pulls on change (CASSANDRA-6971) * Non-droppable verbs shouldn't be dropped from OTC (CASSANDRA-6980) * Shutdown batchlog executor in SS#drain() (CASSANDRA-7025) ++ * Fix batchlog to account for CF truncation records (CASSANDRA-6999) 2.0.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/NEWS.txt ---------------------------------------------------------------------- diff --cc NEWS.txt index 9567ef3,05f9392..ac78a73 --- a/NEWS.txt +++ b/NEWS.txt @@@ -13,46 -13,16 +13,55 @@@ restore snapshots created with the prev 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +2.1 +=== + +New features +------------ + - SSTable data directory name is slightly changed. Each directory will + have hex string appended after CF name, e.g. + ks/cf-5be396077b811e3a3ab9dc4b9ac088d/ + This hex string part represents unique ColumnFamily ID. + Note that existing directories are used as is, so only newly created + directories after upgrade have new directory name format. + - Saved key cache files also have ColumnFamily ID in their file name. + - It is now possible to do incremental repairs, sstables that have been + repaired are marked with a timestamp and not included in the next + repair session. Use nodetool repair -par -inc to use this feature. + A tool to manually mark/unmark sstables as repaired is available in + tools/bin/sstablerepairedset. + +Upgrading +--------- + - Rolling upgrades from anything pre-2.0.7 is not supported. Furthermore + pre-2.0 sstables are not supported. This means that before upgrading + a node on 2.1, this node must be started on 2.0 and + 'nodetool upgdradesstables' must be run (and this even in the case + of not-rolling upgrades). + - For size-tiered compaction users, Cassandra now defaults to ignoring + the coldest 5% of sstables. This can be customized with the + cold_reads_to_omit compaction option; 0.0 omits nothing (the old + behavior) and 1.0 omits everything. + - Multithreaded compaction has been removed. + - Counters implementation has been changed, replaced by a safer one with + less caveats, but different performance characteristics. You might have + to change your data model to accomodate the new implementation. + (See https://issues.apache.org/jira/browse/CASSANDRA-6504 and the dev + blog post at http://www.datastax.com/dev/blog/<PLACEHOLDER> for details). + - (per-table) index_interval parameter has been replaced with + min_index_interval and max_index_interval paratemeters. index_interval + has been deprecated. + + 2.0.7 + ===== + + Upgrading + --------- + - Nothing specific to this release, but please see 2.0.6 if you are upgrading + from a previous version. + + 2.0.6 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/BatchlogManager.java index 02c029d,5770994..7f86a6e --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@@ -244,45 -247,72 +240,72 @@@ public class BatchlogManager implement { DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); int size = in.readInt(); - List<RowMutation> mutations = new ArrayList<>(size); ++ List<Mutation> mutations = new ArrayList<>(size); + for (int i = 0; i < size; i++) - replaySerializedMutation(Mutation.serializer.deserialize(in, version), writtenAt, version, rateLimiter); + { - RowMutation mutation = RowMutation.serializer.deserialize(in, version); ++ Mutation mutation = Mutation.serializer.deserialize(in, version); + + // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. + // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then + // truncated. + for (UUID cfId : mutation.getColumnFamilyIds()) + if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId)) + mutation = mutation.without(cfId); + + if (!mutation.isEmpty()) + mutations.add(mutation); + } + + if (!mutations.isEmpty()) + replayMutations(mutations, writtenAt, version, rateLimiter); } /* * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints * when a replica is down or a write request times out. */ - private void replaySerializedMutation(Mutation mutation, long writtenAt, int version, RateLimiter rateLimiter) - private void replayMutations(List<RowMutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException ++ private void replayMutations(List<Mutation> mutations, long writtenAt, int version, RateLimiter rateLimiter) throws IOException { - int ttl = calculateHintTTL(mutation, writtenAt); + int ttl = calculateHintTTL(mutations, writtenAt); if (ttl <= 0) - return; // the mutation isn't safe to replay. - - Set<InetAddress> liveEndpoints = new HashSet<>(); - String ks = mutation.getKeyspaceName(); - Token<?> tk = StorageService.getPartitioner().getToken(mutation.key()); - int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version); + return; // this batchlog entry has 'expired' - for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), - StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) - for (RowMutation mutation : mutations) ++ for (Mutation mutation : mutations) { - rateLimiter.acquire(mutationSize); - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - mutation.apply(); - else if (FailureDetector.instance.isAlive(endpoint)) - liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. - else - StorageProxy.writeHintForMutation(mutation, ttl, endpoint); - } + List<InetAddress> liveEndpoints = new ArrayList<>(); + List<InetAddress> hintEndpoints = new ArrayList<>(); - if (!liveEndpoints.isEmpty()) - attemptDirectDelivery(mutation, writtenAt, liveEndpoints); + String ks = mutation.getKeyspaceName(); + Token tk = StorageService.getPartitioner().getToken(mutation.key()); - int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, version); ++ int mutationSize = (int) Mutation.serializer.serializedSize(mutation, version); + + for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), + StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) + { + rateLimiter.acquire(mutationSize); + if (endpoint.equals(FBUtilities.getBroadcastAddress())) + mutation.apply(); + else if (FailureDetector.instance.isAlive(endpoint)) + liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. + else + hintEndpoints.add(endpoint); + } + + if (!liveEndpoints.isEmpty()) + hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints)); + + for (InetAddress endpoint : hintEndpoints) + StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint); + } } - private void attemptDirectDelivery(Mutation mutation, long writtenAt, Set<InetAddress> endpoints) + // Returns the endpoints we failed to deliver to. - private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, List<InetAddress> endpoints) throws IOException ++ private Set<InetAddress> attemptDirectDelivery(Mutation mutation, List<InetAddress> endpoints) throws IOException { - List<WriteResponseHandler> handlers = Lists.newArrayList(); - final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<>(endpoints); + final List<WriteResponseHandler> handlers = new ArrayList<>(); + final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>()); + for (final InetAddress ep : endpoints) { Runnable callback = new Runnable() @@@ -310,22 -340,26 +333,21 @@@ } } - if (!undelivered.isEmpty()) - { - int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl - if (ttl > 0) - for (InetAddress endpoint : undelivered) - StorageProxy.writeHintForMutation(mutation, ttl, endpoint); - } + return undelivered; } - // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog). - // this ensures that deletes aren't "undone" by an old batch replay. - private int calculateHintTTL(Mutation mutation, long writtenAt) + /* + * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). + * This ensures that deletes aren't "undone" by an old batch replay. + */ - private int calculateHintTTL(List<RowMutation> mutations, long writtenAt) ++ private int calculateHintTTL(List<Mutation> mutations, long writtenAt) { - return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000); + int unadjustedTTL = Integer.MAX_VALUE; - for (RowMutation mutation : mutations) ++ for (Mutation mutation : mutations) + unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation)); + return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); } - private static ByteBuffer columnName(String name) - { - return CFMetaData.BatchlogCf.getCfDef().getColumnNameBuilder().add(UTF8Type.instance.decompose(name)).build(); - } - // force flush + compaction to reclaim space from the replayed batches private void cleanup() throws ExecutionException, InterruptedException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 923ea5b,36bc470..8f96765 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -2795,17 -2416,6 +2795,11 @@@ public class ColumnFamilyStore implemen return getDataTracker().getDroppableTombstoneRatio(); } - public long getTruncationTime() - { - Pair<ReplayPosition, Long> truncationRecord = SystemKeyspace.getTruncationRecords().get(metadata.cfId); - return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right; - } - + public long trueSnapshotsSize() + { + return directories.trueSnapshotsSize(); + } + @VisibleForTesting void resetFileIndexGenerator() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/HintedHandOffManager.java index 12404da,942707e..e83aefc --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@@ -121,7 -119,7 +120,7 @@@ public class HintedHandOffManager imple * Returns a mutation representing a Hint to be sent to <code>targetId</code> * as soon as it becomes available again. */ - public Mutation hintFor(Mutation mutation, int ttl, UUID targetId) - public RowMutation hintFor(RowMutation mutation, long now, int ttl, UUID targetId) ++ public Mutation hintFor(Mutation mutation, long now, int ttl, UUID targetId) { assert ttl > 0; @@@ -134,11 -132,11 +133,11 @@@ UUID hintId = UUIDGen.getTimeUUID(); // serialize the hint with id and version as a composite column name - ByteBuffer name = comparator.decompose(hintId, MessagingService.current_version); - ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version)); + CellName name = CFMetaData.HintsCf.comparator.makeCellName(hintId, MessagingService.current_version); + ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version)); ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF)); - cf.addColumn(name, value, System.currentTimeMillis(), ttl); + cf.addColumn(name, value, now, ttl); - return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf); + return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf); } /* @@@ -389,8 -387,7 +388,7 @@@ } List<WriteResponseHandler> responseHandlers = Lists.newArrayList(); - Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>(); - for (final Column hint : hintsPage) + for (final Cell hint : hintsPage) { // check if hints delivery has been paused during the process if (hintedHandOffPaused) @@@ -427,21 -425,12 +425,12 @@@ throw new AssertionError(e); } - truncationTimesCache.clear(); - for (UUID cfId : ImmutableSet.copyOf((mutation.getColumnFamilyIds()))) - for (UUID cfId : rm.getColumnFamilyIds()) ++ for (UUID cfId : mutation.getColumnFamilyIds()) { - Long truncatedAt = truncationTimesCache.get(cfId); - if (truncatedAt == null) - { - ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(cfId); - truncatedAt = cfs.getTruncationTime(); - truncationTimesCache.put(cfId, truncatedAt); - } - - if (hint.timestamp() < truncatedAt) - if (hint.maxTimestamp() <= SystemKeyspace.getTruncatedAt(cfId)) ++ if (hint.timestamp() <= SystemKeyspace.getTruncatedAt(cfId)) { logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId); - rm = rm.without(cfId); + mutation = mutation.without(cfId); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index a4072d1,fe8f179..b19eb1e --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -41,10 -40,8 +41,10 @@@ import org.apache.cassandra.config.Sche import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; - import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; + import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.composites.Composites; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.Range; @@@ -90,12 -86,8 +90,14 @@@ public class SystemKeyspac private static final String LOCAL_KEY = "local"; private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local"); + public static final List<String> allSchemaCfs = Arrays.asList(SCHEMA_KEYSPACES_CF, + SCHEMA_COLUMNFAMILIES_CF, + SCHEMA_COLUMNS_CF, + SCHEMA_TRIGGERS_CF, + SCHEMA_USER_TYPES_CF); + + private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords; + public enum BootstrapState { NEEDS_BOOTSTRAP, http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index a9d061a,dce7256..1b137ca --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -911,7 -900,10 +911,10 @@@ public class StorageProxy implements St return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable); } - public static void writeHintForMutation(Mutation mutation, int ttl, InetAddress target) + /** + * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs + */ - public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target) ++ public static void writeHintForMutation(Mutation mutation, long now, int ttl, InetAddress target) { assert ttl > 0; UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target); http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/BatchlogManagerTest.java index 43f96fb,954c1f2..9982be9 --- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java @@@ -28,9 -31,8 +31,10 @@@ import org.apache.cassandra.Util import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.composites.CellNameType; + import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.UUIDGen; @@@ -106,4 -103,66 +110,72 @@@ public class BatchlogManagerTest extend UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\"")); assertEquals(500, result.one().getLong("count")); } + + @Test + public void testTruncatedReplay() throws InterruptedException, ExecutionException + { ++ CellNameType comparator2 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2").metadata.comparator; ++ CellNameType comparator3 = Keyspace.open("Keyspace1").getColumnFamilyStore("Standard3").metadata.comparator; + // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog. + // Each batchlog entry with a mutation for Standard2 and Standard3. + // In the middle of the process, 'truncate' Standard2. + for (int i = 0; i < 1000; i++) + { - RowMutation mutation1 = new RowMutation("Keyspace1", bytes(i)); - mutation1.add("Standard2", bytes(i), bytes(i), 0); - RowMutation mutation2 = new RowMutation("Keyspace1", bytes(i)); - mutation2.add("Standard3", bytes(i), bytes(i), 0); - List<RowMutation> mutations = Lists.newArrayList(mutation1, mutation2); ++ Mutation mutation1 = new Mutation("Keyspace1", bytes(i)); ++ mutation1.add("Standard2", comparator2.makeCellName(bytes(i)), bytes(i), 0); ++ Mutation mutation2 = new Mutation("Keyspace1", bytes(i)); ++ mutation2.add("Standard3", comparator3.makeCellName(bytes(i)), bytes(i), 0); ++ List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2); + + // Make sure it's ready to be replayed, so adjust the timestamp. + long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2; + + if (i == 500) + SystemKeyspace.saveTruncationRecord(Keyspace.open("Keyspace1").getColumnFamilyStore("Standard2"), + timestamp, + ReplayPosition.NONE); + + // Adjust the timestamp (slightly) to make the test deterministic. + if (i >= 500) + timestamp++; + else + timestamp--; + - BatchlogManager.getBatchlogMutationFor(mutations, UUIDGen.getTimeUUID(), timestamp * 1000).apply(); ++ BatchlogManager.getBatchlogMutationFor(mutations, ++ UUIDGen.getTimeUUID(), ++ MessagingService.current_version, ++ timestamp * 1000) ++ .apply(); + } + + // Flush the batchlog to disk (see CASSANDRA-6822). + Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF).forceFlush(); + + // Force batchlog replay. + BatchlogManager.instance.replayAllFailedBatches(); + + // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied. + for (int i = 0; i < 1000; i++) + { + UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i)); + if (i >= 500) + { + assertEquals(bytes(i), result.one().getBytes("key")); + assertEquals(bytes(i), result.one().getBytes("column1")); + assertEquals(bytes(i), result.one().getBytes("value")); + } + else + { + assertTrue(result.isEmpty()); + } + } + + for (int i = 0; i < 1000; i++) + { + UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i)); + assertEquals(bytes(i), result.one().getBytes("key")); + assertEquals(bytes(i), result.one().getBytes("column1")); + assertEquals(bytes(i), result.one().getBytes("value")); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/66af6fed/test/unit/org/apache/cassandra/db/HintedHandOffTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/HintedHandOffTest.java index c3b9367,9ffd702..622c816 --- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java +++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java @@@ -28,8 -27,9 +27,10 @@@ import java.util.concurrent.TimeUnit import org.junit.Test; + import com.google.common.collect.Iterators; + import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; @@@ -63,10 -61,14 +62,14 @@@ public class HintedHandOffTest extends hintStore.disableAutoCompaction(); // insert 1 hint - RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1)); - rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); + Mutation rm = new Mutation(KEYSPACE4, ByteBufferUtil.bytes(1)); + rm.add(STANDARD1_CF, Util.cellname(COLUMN1), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); - HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply(); + HintedHandOffManager.instance.hintFor(rm, + System.currentTimeMillis(), + HintedHandOffManager.calculateHintTTL(rm), + UUID.randomUUID()) + .apply(); // flush data to disk hintStore.forceBlockingFlush(); @@@ -102,10 -104,14 +105,14 @@@ hintStore.clearUnsafe(); // insert 1 hint - RowMutation rm = new RowMutation(KEYSPACE4, ByteBufferUtil.bytes(1)); - rm.add(STANDARD1_CF, ByteBufferUtil.bytes(String.valueOf(COLUMN1)), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); + Mutation rm = new Mutation(KEYSPACE4, ByteBufferUtil.bytes(1)); + rm.add(STANDARD1_CF, Util.cellname(COLUMN1), ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); - HintedHandOffManager.instance.hintFor(rm, HintedHandOffManager.calculateHintTTL(rm), UUID.randomUUID()).apply(); + HintedHandOffManager.instance.hintFor(rm, + System.currentTimeMillis(), + HintedHandOffManager.calculateHintTTL(rm), + UUID.randomUUID()) + .apply(); assert getNoOfHints() == 1;