Repository: cassandra Updated Branches: refs/heads/trunk 0766f7e54 -> 877b08eaf
Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters Patch by Ariel Weisberg; Reviewed by Dinesh Joshi for CASSANDRA-14841 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/877b08ea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/877b08ea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/877b08ea Branch: refs/heads/trunk Commit: 877b08eaf0e02542c9f6d9f8cd457a8e44b4febf Parents: 0766f7e Author: Ariel Weisberg <[email protected]> Authored: Mon Oct 29 15:26:22 2018 -0400 Committer: Ariel Weisberg <[email protected]> Committed: Thu Nov 1 12:12:34 2018 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 6 +++ src/java/org/apache/cassandra/gms/Gossiper.java | 43 ++++++++++++++- .../apache/cassandra/gms/VersionedValue.java | 7 +++ .../repair/SystemDistributedKeyspace.java | 16 ++++++ .../cassandra/tracing/TraceStateImpl.java | 11 ++++ .../org/apache/cassandra/gms/GossiperTest.java | 55 +++++++++++++++++--- 7 files changed, 130 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f49531c..b7c0398 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841) * Avoid running query to self through messaging service (CASSANDRA-14807) * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373) * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759) http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 5066378..3267c91 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -101,6 +101,12 @@ New features Upgrading --------- + - Additional columns have been added to system_distributed.repair_history, + system_traces.sessions and system_traces.events. As a result select * queries + againsts these tables will fail and generate an error in the log + during upgrade when the cluster is mixed version. Additionally these + tables will not be written to if repair or tracing occurs until + the entire cluster is upgraded and there are no 3.X version nodes in Gossip. - Timestamp ties between values resolve differently: if either value has a TTL, this value always wins. This is to provide consistent reconciliation before and after the value expires into a tombstone. http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 170843b..aedcb04 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BooleanSupplier; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -31,6 +32,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -108,7 +110,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>(); /* live member set */ - private final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>(); + @VisibleForTesting + final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>(); /* unreachable member set */ private final Map<InetAddressAndPort, Long> unreachableEndpoints = new ConcurrentHashMap<>(); @@ -136,6 +139,39 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean private volatile long lastProcessedMessageAt = System.currentTimeMillis(); + //This property and anything that checks it should be removed in 5.0 + private boolean haveMajorVersion3Nodes = true; + + final com.google.common.base.Supplier<Boolean> haveMajorVersion3NodesSupplier = () -> + { + //Once there are no prior version nodes we don't need to keep rechecking + if (!haveMajorVersion3Nodes) + return false; + + Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); + CassandraVersion referenceVersion = null; + + for (InetAddressAndPort host : allHosts) + { + CassandraVersion version = getReleaseVersion(host); + + //Raced with changes to gossip state + if (version == null) + continue; + + if (referenceVersion == null) + referenceVersion = version; + + if (version.major < 4) + return true; + } + + haveMajorVersion3Nodes = false; + return false; + }; + + private final Supplier<Boolean> haveMajorVersion3NodesMemoized = Suppliers.memoizeWithExpiration(haveMajorVersion3NodesSupplier, 1, TimeUnit.MINUTES); + private class GossipTask implements Runnable { public void run() @@ -1906,6 +1942,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } } + public boolean haveMajorVersion3Nodes() + { + return haveMajorVersion3NodesMemoized.get(); + } + private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes) { UUID expectedVersion = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 691f544..94b8cb8 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -24,6 +24,7 @@ import java.util.UUID; import static java.nio.charset.StandardCharsets.ISO_8859_1; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import org.apache.cassandra.db.TypeSizes; @@ -265,6 +266,12 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(FBUtilities.getReleaseVersionString()); } + @VisibleForTesting + public VersionedValue releaseVersion(String version) + { + return new VersionedValue(version); + } + public VersionedValue networkVersion() { return new VersionedValue(String.valueOf(MessagingService.current_version)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index fc09e71..d4b7259 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@ -44,6 +44,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -187,6 +188,11 @@ public final class SystemDistributedKeyspace public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, CommonRange commonRange) { + //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors + //due to schema differences + if (Gossiper.instance.haveMajorVersion3Nodes()) + return; + InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort(); Set<String> participants = Sets.newHashSet(); Set<String> participants_v2 = Sets.newHashSet(); @@ -230,6 +236,11 @@ public final class SystemDistributedKeyspace public static void successfulRepairJob(UUID id, String keyspaceName, String cfname) { + //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors + //due to schema differences + if (Gossiper.instance.haveMajorVersion3Nodes()) + return; + String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()) WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s"; String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY, RepairState.SUCCESS.toString(), @@ -241,6 +252,11 @@ public final class SystemDistributedKeyspace public static void failedRepairJob(UUID id, String keyspaceName, String cfname, Throwable t) { + //Don't record repair history if an upgrade is in progress as version 3 nodes generates errors + //due to schema differences + if (Gossiper.instance.haveMajorVersion3Nodes()) + return; + String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()), exception_message=?, exception_stacktrace=? WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s"; StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/tracing/TraceStateImpl.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java index 2722406..e16f778 100644 --- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@ -35,6 +35,7 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -102,6 +103,11 @@ public class TraceStateImpl extends TraceState void executeMutation(final Mutation mutation) { + //Don't record trace state if an upgrade is in progress as version 3 nodes generates errors + //due to schema differences + if (Gossiper.instance.haveMajorVersion3Nodes()) + return; + CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable() { protected void runMayThrow() @@ -117,6 +123,11 @@ public class TraceStateImpl extends TraceState static void mutateWithCatch(Mutation mutation) { + //Don't record trace state if an upgrade is in progress as version 3 nodes generates errors + //due to schema differences + if (Gossiper.instance.haveMajorVersion3Nodes()) + return; + try { StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY, System.nanoTime()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/test/unit/org/apache/cassandra/gms/GossiperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java index b856983..a78d300 100644 --- a/test/unit/org/apache/cassandra/gms/GossiperTest.java +++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java @@ -42,6 +42,8 @@ import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class GossiperTest { @@ -74,6 +76,43 @@ public class GossiperTest } @Test + public void testHaveVersion3Nodes() throws Exception + { + VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null); + EndpointState es = new EndpointState(null); + es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("4.0-SNAPSHOT")); + Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"), es); + Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1")); + + + es = new EndpointState(null); + es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.11.3")); + Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"), es); + Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2")); + + + es = new EndpointState(null); + es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.0.0")); + Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"), es); + Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3")); + + + assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get()); + + Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2")); + Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2")); + + + assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get()); + + Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.3")); + Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3")); + + assertFalse(Gossiper.instance.haveMajorVersion3NodesSupplier.get()); + + } + + @Test public void testLargeGenerationJump() throws UnknownHostException, InterruptedException { Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2); @@ -136,15 +175,15 @@ public class GossiperTest // Check that the new entry was added Assert.assertEquals(nextSize, loadedList.size()); for (InetAddressAndPort a : nextSeeds) - Assert.assertTrue(loadedList.contains(a.toString())); + assertTrue(loadedList.contains(a.toString())); // Check that the return value of the reloadSeeds matches the content of the getSeeds call // and that they both match the internal contents of the Gossiper seeds list Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size()); for (InetAddressAndPort a : gossiper.seeds) { - Assert.assertTrue(loadedList.contains(a.toString())); - Assert.assertTrue(gossiper.getSeeds().contains(a.toString())); + assertTrue(loadedList.contains(a.toString())); + assertTrue(gossiper.getSeeds().contains(a.toString())); } // Add a duplicate of the last address to the seed provider list @@ -157,7 +196,7 @@ public class GossiperTest // Check that the number of seed nodes reported hasn't increased Assert.assertEquals(uniqueSize, loadedList.size()); for (InetAddressAndPort a : nextSeeds) - Assert.assertTrue(loadedList.contains(a.toString())); + assertTrue(loadedList.contains(a.toString())); // Create a new list that has no overlaps with the previous list addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1")); @@ -176,8 +215,8 @@ public class GossiperTest Assert.assertEquals(disjointSize, loadedList.size()); for (InetAddressAndPort a : disjointSeeds) { - Assert.assertTrue(gossiper.getSeeds().contains(a.toString())); - Assert.assertTrue(loadedList.contains(a.toString())); + assertTrue(gossiper.getSeeds().contains(a.toString())); + assertTrue(loadedList.contains(a.toString())); } // Set the seed node provider to return an empty list @@ -187,7 +226,7 @@ public class GossiperTest // Check that the in memory seed node list was not modified Assert.assertEquals(disjointSize, loadedList.size()); for (InetAddressAndPort a : disjointSeeds) - Assert.assertTrue(loadedList.contains(a.toString())); + assertTrue(loadedList.contains(a.toString())); // Change the seed provider to one that throws an unchecked exception DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider()); @@ -199,7 +238,7 @@ public class GossiperTest // Check that the in memory seed node list was not modified and the exception was caught Assert.assertEquals(disjointSize, gossiper.getSeeds().size()); for (InetAddressAndPort a : disjointSeeds) - Assert.assertTrue(gossiper.getSeeds().contains(a.toString())); + assertTrue(gossiper.getSeeds().contains(a.toString())); } static class TestSeedProvider implements SeedProvider --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
