Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 bc1f84129 -> b2f6ce961 refs/heads/cassandra-3.11 684120deb -> 32f9e6763 refs/heads/trunk 71a27ee2b -> 9fecf9477
Delay hints store excise by write timeout to avoid race with decommission patch by Jaydeepkumar Chovatia; reviewed by Aleksey Yeschenko for CASSANDRA-13740 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2f6ce96 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2f6ce96 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2f6ce96 Branch: refs/heads/cassandra-3.0 Commit: b2f6ce961f38a3e4cd744e102026bf7a471056c9 Parents: bc1f841 Author: Jaydeepkumar Chovatia <chovatia.jayd...@gmail.com> Authored: Thu Aug 3 15:34:26 2017 -0700 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Mon Apr 30 17:32:05 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/hints/HintsCatalog.java | 7 ++ .../apache/cassandra/hints/HintsService.java | 2 +- .../org/apache/cassandra/hints/HintsStore.java | 7 ++ .../cassandra/service/StorageService.java | 7 +- .../cassandra/hints/HintsCatalogTest.java | 114 ++++++++++++++++++- 6 files changed, 135 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cf470d6..857cf96 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.17 + * Delay hints store excise by write timeout to avoid race with decommission (CASSANDRA-13740) * Deprecate background repair and probablistic read_repair_chance table options (CASSANDRA-13910) * Add missed CQL keywords to documentation (CASSANDRA-14359) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/hints/HintsCatalog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsCatalog.java b/src/java/org/apache/cassandra/hints/HintsCatalog.java index 6d01629..d1f6fba 100644 --- a/src/java/org/apache/cassandra/hints/HintsCatalog.java +++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java @@ -23,6 +23,7 @@ import java.nio.file.Files; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; +import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; @@ -94,6 +95,12 @@ final class HintsCatalog : store; } + @Nullable + HintsStore getNullable(UUID hostId) + { + return stores.get(hostId); + } + /** * Delete all hints for all host ids. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/hints/HintsService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java index 9cd4ed3..268ee1f 100644 --- a/src/java/org/apache/cassandra/hints/HintsService.java +++ b/src/java/org/apache/cassandra/hints/HintsService.java @@ -302,7 +302,7 @@ public final class HintsService implements HintsServiceMBean */ public void excise(UUID hostId) { - HintsStore store = catalog.get(hostId); + HintsStore store = catalog.getNullable(hostId); if (store == null) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/hints/HintsStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java b/src/java/org/apache/cassandra/hints/HintsStore.java index c066331..bb3aa0f 100644 --- a/src/java/org/apache/cassandra/hints/HintsStore.java +++ b/src/java/org/apache/cassandra/hints/HintsStore.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,6 +78,12 @@ final class HintsStore return new HintsStore(hostId, hintsDirectory, writerParams, descriptors); } + @VisibleForTesting + int getDispatchQueueSize() + { + return dispatchDequeue.size(); + } + InetAddress address() { return StorageService.instance.getEndpointForHostId(hostId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 77fcb81..5f76f7d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2276,7 +2276,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE UUID hostId = tokenMetadata.getHostId(endpoint); if (hostId != null && tokenMetadata.isMember(endpoint)) - HintsService.instance.excise(hostId); + { + // enough time for writes to expire and MessagingService timeout reporter callback to fire, which is where + // hints are mostly written from - using getMinRpcTimeout() / 2 for the interval. + long delay = DatabaseDescriptor.getMinRpcTimeout() + DatabaseDescriptor.getWriteRpcTimeout(); + ScheduledExecutors.optionalTasks.schedule(() -> HintsService.instance.excise(hostId), delay, TimeUnit.MILLISECONDS); + } removeEndpoint(endpoint); tokenMetadata.removeEndpoint(endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2f6ce96/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java index 51b6aa3..928fd31 100644 --- a/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsCatalogTest.java @@ -19,16 +19,42 @@ package org.apache.cassandra.hints; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.util.*; import com.google.common.collect.ImmutableMap; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.FBUtilities; +import org.junit.BeforeClass; import org.junit.Test; import static junit.framework.Assert.*; +import static org.apache.cassandra.Util.dk; public class HintsCatalogTest { + private static final String KEYSPACE = "hint_test"; + private static final String TABLE0 = "table_0"; + private static final String TABLE1 = "table_1"; + private static final String TABLE2 = "table_2"; + private static final int WRITE_BUFFER_SIZE = 256 << 10; + + @BeforeClass + public static void defineSchema() + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, TABLE0), + SchemaLoader.standardCFMD(KEYSPACE, TABLE1), + SchemaLoader.standardCFMD(KEYSPACE, TABLE2)); + } + @Test public void loadCompletenessAndOrderTest() throws IOException { @@ -43,7 +69,21 @@ public class HintsCatalogTest } } - public static void loadCompletenessAndOrderTest(File directory) throws IOException + @Test + public void exciseHintFiles() throws IOException + { + File directory = Files.createTempDirectory(null).toFile(); + try + { + exciseHintFiles(directory); + } + finally + { + directory.deleteOnExit(); + } + } + + private void loadCompletenessAndOrderTest(File directory) throws IOException { UUID hostId1 = UUID.randomUUID(); UUID hostId2 = UUID.randomUUID(); @@ -79,6 +119,39 @@ public class HintsCatalogTest assertNull(store2.poll()); } + private static void exciseHintFiles(File directory) throws IOException + { + UUID hostId = UUID.randomUUID(); + + HintsDescriptor descriptor1 = new HintsDescriptor(hostId, System.currentTimeMillis()); + HintsDescriptor descriptor2 = new HintsDescriptor(hostId, System.currentTimeMillis() + 1); + HintsDescriptor descriptor3 = new HintsDescriptor(hostId, System.currentTimeMillis() + 2); + HintsDescriptor descriptor4 = new HintsDescriptor(hostId, System.currentTimeMillis() + 3); + + createHintFile(directory, descriptor1); + createHintFile(directory, descriptor2); + createHintFile(directory, descriptor3); + createHintFile(directory, descriptor4); + + HintsCatalog catalog = HintsCatalog.load(directory, ImmutableMap.of()); + assertEquals(1, catalog.stores().count()); + + HintsStore store = catalog.get(hostId); + + //should have 4 hint files + assertEquals(4, store.getDispatchQueueSize()); + + //excise store as a result it should remove all the hint files + catalog.exciseStore(hostId); + + catalog = HintsCatalog.load(directory, ImmutableMap.of()); + assertEquals(0, catalog.stores().count()); + store = catalog.get(hostId); + + //should have 0 hint files now + assertEquals(0, store.getDispatchQueueSize()); + } + @SuppressWarnings("EmptyTryBlock") private static void writeDescriptor(File directory, HintsDescriptor descriptor) throws IOException { @@ -86,4 +159,43 @@ public class HintsCatalogTest { } } + + private static Mutation createMutation(String key, long now) + { + Mutation mutation = new Mutation(KEYSPACE, dk(key)); + + new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE0), now, mutation) + .clustering("column0") + .add("val", "value0") + .build(); + + new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE1), now + 1, mutation) + .clustering("column1") + .add("val", "value1") + .build(); + + new RowUpdateBuilder(Schema.instance.getCFMetaData(KEYSPACE, TABLE2), now + 2, mutation) + .clustering("column2") + .add("val", "value2") + .build(); + + return mutation; + } + + @SuppressWarnings("EmptyTryBlock") + private static void createHintFile(File directory, HintsDescriptor descriptor) throws IOException + { + try (HintsWriter writer = HintsWriter.create(directory, descriptor)) + { + ByteBuffer writeBuffer = ByteBuffer.allocateDirect(WRITE_BUFFER_SIZE); + try (HintsWriter.Session session = writer.newSession(writeBuffer)) + { + long now = FBUtilities.timestampMicros(); + Mutation mutation = createMutation("testSerializer", now); + Hint hint = Hint.create(mutation, now / 1000); + + session.append(hint); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org