Repository: cassandra Updated Branches: refs/heads/cassandra-3.1 2b8cdc234 -> ee46dfbf9
Avoid MV race during node decommission Patch by Paulo Motta; reviewed by Joel Knighton for CASSANDRA-10674 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c184e8c1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c184e8c1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c184e8c1 Branch: refs/heads/cassandra-3.1 Commit: c184e8c14b28eddc20cbdd098f5e47d1ed832898 Parents: a4da379 Author: Paulo Motta <[email protected]> Authored: Wed Nov 25 14:50:31 2015 -0800 Committer: T Jake Luciani <[email protected]> Committed: Fri Dec 4 15:50:48 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------ .../apache/cassandra/service/StorageProxy.java | 67 ++++++++++++-------- .../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++-- 4 files changed, 88 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f3f182..b95aa76 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.1 + * Avoid MV race during node decommission (CASSANDRA-10674) * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) * Handle single-column deletions correction in materialized views when the column is part of the view primary key (CASSANDRA-10796) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/db/view/ViewUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java index 089a3b7..4d9517f 100644 --- a/src/java/org/apache/cassandra/db/view/ViewUtils.java +++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java @@ -21,13 +21,13 @@ package org.apache.cassandra.db.view; import java.net.InetAddress; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.NetworkTopologyStrategy; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; public final class ViewUtils @@ -56,9 +56,9 @@ public final class ViewUtils * B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3) * C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3) * - * @throws RuntimeException if this method is called using a base token which does not belong to this replica + * @return Optional.empty() if this method is called using a base token which does not belong to this replica */ - public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) + public static Optional<InetAddress> getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) { AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); @@ -77,7 +77,7 @@ public final class ViewUtils { // If we are a base endpoint which is also a view replica, we use ourselves as our view replica if (viewEndpoint.equals(FBUtilities.getBroadcastAddress())) - return viewEndpoint; + return Optional.of(viewEndpoint); // We have to remove any endpoint which is shared between the base and the view, as it will select itself // and throw off the counts otherwise. @@ -95,20 +95,9 @@ public final class ViewUtils int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddress()); if (baseIdx < 0) - { - - if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken, keyspaceName).size() > 0) - { - //Since there are pending endpoints we are going to write to the batchlog regardless. - //So we can pretend we are the views endpoint. - - return FBUtilities.getBroadcastAddress(); - } - - throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica"); - } - + //This node is not a base replica of this key, so we return empty + return Optional.empty(); - return viewEndpoints.get(baseIdx); + return Optional.of(viewEndpoints.get(baseIdx)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 15be7c6..397b8b9 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -670,12 +670,12 @@ public class StorageProxy implements StorageProxyMBean if (StorageService.instance.isStarting() || StorageService.instance.isJoining() || StorageService.instance.isMoving()) { BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), - mutations), - writeCommitLog); + mutations), writeCommitLog); } else { List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); + List<Mutation> nonPairedMutations = new LinkedList<>(); Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey); ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; @@ -684,40 +684,51 @@ public class StorageProxy implements StorageProxyMBean final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); - // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) { String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); - InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); - List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint); - - WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation, - consistencyLevel, - consistencyLevel, - naturalEndpoints, - baseComplete, - WriteType.BATCH, - cleanup); - - // When local node is the endpoint and there are no pending nodes we can - // Just apply the mutation locally. - if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty() && StorageService.instance.isJoined()) + Optional<InetAddress> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + + if (pairedEndpoint.isPresent()) { - try - { - mutation.apply(writeCommitLog); - } - catch (Exception exc) + // When local node is the endpoint and there are no pending nodes we can + // Just apply the mutation locally. + if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress()) + && pendingEndpoints.isEmpty() && StorageService.instance.isJoined()) + try + { + mutation.apply(writeCommitLog); + } + catch (Exception exc) + { + logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation); + throw exc; + } + else { - logger.error("Error applying local view update to keyspace {}: {}", mutation.getKeyspaceName(), mutation); - throw exc; + wrappers.add(wrapViewBatchResponseHandler(mutation, + consistencyLevel, + consistencyLevel, + Collections.singletonList(pairedEndpoint.get()), + baseComplete, + WriteType.BATCH, + cleanup)); } } else { - wrappers.add(wrapper); + //if there are no paired endpoints there are probably range movements going on, + //so we write to the local batchlog to replay later + if (pendingEndpoints.isEmpty()) + logger.warn("Received base materialized view mutation for key %s that does not belong " + + "to this node. There is probably a range movement happening (move or decommission)," + + "but this node hasn't updated its ring metadata yet. Adding mutation to " + + "local batchlog to be replayed later.", + mutation.key()); + nonPairedMutations.add(mutation); } } @@ -730,6 +741,12 @@ public class StorageProxy implements StorageProxyMBean // now actually perform the writes and wait for them to complete asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION); } + + if (!nonPairedMutations.isEmpty()) + { + BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), nonPairedMutations), + writeCommitLog); + } } } finally http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java index 8fd0cfb..c238f36 100644 --- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java +++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.db.view; import java.net.InetAddress; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.junit.BeforeClass; import org.junit.Test; @@ -74,11 +75,12 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.setKeyspaceMetadata(meta); - InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", new StringToken("CA"), new StringToken("BB")); - Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint); + Assert.assertTrue(naturalEndpoint.isPresent()); + Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint.get()); } @@ -106,10 +108,42 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.setKeyspaceMetadata(meta); - InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", new StringToken("CA"), new StringToken("BB")); - Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint); + Assert.assertTrue(naturalEndpoint.isPresent()); + Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint.get()); + } + + @Test + public void testBaseTokenDoesNotBelongToLocalReplicaShouldReturnEmpty() throws Exception + { + TokenMetadata metadata = StorageService.instance.getTokenMetadata(); + metadata.clearUnsafe(); + + // DC1 + metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1")); + metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2")); + + // DC2 + metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4")); + metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5")); + + Map<String, String> replicationMap = new HashMap<>(); + replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName()); + + replicationMap.put("DC1", "1"); + replicationMap.put("DC2", "1"); + + Keyspace.clear("Keyspace1"); + KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); + Schema.instance.setKeyspaceMetadata(meta); + + Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + new StringToken("AB"), + new StringToken("BB")); + + Assert.assertFalse(naturalEndpoint.isPresent()); } }
