Defer auth setup until all nodes are >= 2.2 Patch and review by Sylvain Lebresne and Sam Tunnicliffe for CASSANDRA-9761
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b22ad421 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b22ad421 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b22ad421 Branch: refs/heads/cassandra-3.0 Commit: b22ad4210cbf9c0c0bd6e595265b162f391da3a1 Parents: 813eb23 Author: Sylvain Lebresne <[email protected]> Authored: Tue Sep 8 11:05:46 2015 +0200 Committer: Sam Tunnicliffe <[email protected]> Committed: Thu Sep 10 12:44:12 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/auth/CassandraRoleManager.java | 83 +++++++++++++++----- .../apache/cassandra/net/MessagingService.java | 33 +++++++- 3 files changed, 95 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e1365b5..b927f95 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.2 + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761) * Cancel transaction for sstables we wont redistribute index summary for (CASSANDRA-10270) * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/src/java/org/apache/cassandra/auth/CassandraRoleManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java index 802ae3c..9151958 100644 --- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java +++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java @@ -18,6 +18,7 @@ package org.apache.cassandra.auth; import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import com.google.common.base.*; @@ -37,6 +38,7 @@ import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; @@ -124,6 +126,9 @@ public class CassandraRoleManager implements IRoleManager private final Set<Option> supportedOptions; private final Set<Option> alterableOptions; + // Will be set to true when all nodes in the cluster are on a version which supports roles (i.e. 2.2+) + private volatile boolean isClusterReady = false; + public CassandraRoleManager() { supportedOptions = DatabaseDescriptor.getAuthenticator().getClass() == PasswordAuthenticator.class @@ -149,21 +154,23 @@ public class CassandraRoleManager implements IRoleManager legacySelectUserStatement = (SelectStatement) prepare("SELECT * FROM %s.%s WHERE name = ?", AuthKeyspace.NAME, LEGACY_USERS_TABLE); - scheduleSetupTask(new Runnable() + scheduleSetupTask(new Callable<Void>() { - public void run() + public Void call() throws Exception { convertLegacyData(); + return null; } }); } else { - scheduleSetupTask(new Runnable() + scheduleSetupTask(new Callable<Void>() { - public void run() + public Void call() throws Exception { setupDefaultRole(); + return null; } }); } @@ -217,12 +224,12 @@ public class CassandraRoleManager implements IRoleManager Predicates.notNull())); if (!Strings.isNullOrEmpty(assignments)) { - QueryProcessor.process(String.format("UPDATE %s.%s SET %s WHERE role = '%s'", - AuthKeyspace.NAME, - AuthKeyspace.ROLES, - assignments, - escape(role.getRoleName())), - consistencyForRole(role.getRoleName())); + process(String.format("UPDATE %s.%s SET %s WHERE role = '%s'", + AuthKeyspace.NAME, + AuthKeyspace.ROLES, + assignments, + escape(role.getRoleName())), + consistencyForRole(role.getRoleName())); } } @@ -278,10 +285,7 @@ public class CassandraRoleManager implements IRoleManager public Set<RoleResource> getAllRoles() throws RequestValidationException, RequestExecutionException { - UntypedResultSet rows = QueryProcessor.process(String.format("SELECT role from %s.%s", - AuthKeyspace.NAME, - AuthKeyspace.ROLES), - ConsistencyLevel.QUORUM); + UntypedResultSet rows = process(String.format("SELECT role from %s.%s", AuthKeyspace.NAME, AuthKeyspace.ROLES), ConsistencyLevel.QUORUM); Iterable<RoleResource> roles = Iterables.transform(rows, new Function<UntypedResultSet.Row, RoleResource>() { public RoleResource apply(UntypedResultSet.Row row) @@ -346,6 +350,7 @@ public class CassandraRoleManager implements IRoleManager catch (RequestExecutionException e) { logger.warn("CassandraRoleManager skipped default role setup: some nodes were not ready"); + throw e; } } @@ -354,15 +359,40 @@ public class CassandraRoleManager implements IRoleManager // Try looking up the 'cassandra' default role first, to avoid the range query if possible. String defaultSUQuery = String.format("SELECT * FROM %s.%s WHERE role = '%s'", AuthKeyspace.NAME, AuthKeyspace.ROLES, DEFAULT_SUPERUSER_NAME); String allUsersQuery = String.format("SELECT * FROM %s.%s LIMIT 1", AuthKeyspace.NAME, AuthKeyspace.ROLES); - return !process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty() - || !process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty() - || !process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty(); + return !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.ONE).isEmpty() + || !QueryProcessor.process(defaultSUQuery, ConsistencyLevel.QUORUM).isEmpty() + || !QueryProcessor.process(allUsersQuery, ConsistencyLevel.QUORUM).isEmpty(); } - private void scheduleSetupTask(Runnable runnable) + private void scheduleSetupTask(final Callable<Void> setupTask) { // The delay is to give the node a chance to see its peers before attempting the operation - ScheduledExecutors.optionalTasks.schedule(runnable, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS); + ScheduledExecutors.optionalTasks.schedule(new Runnable() + { + public void run() + { + // If not all nodes are on 2.2, we don't want to initialize the role manager as this will confuse 2.1 + // nodes (see CASSANDRA-9761 for details). So we re-schedule the setup for later, hoping that the upgrade + // will be finished by then. + if (!MessagingService.instance().areAllNodesAtLeast22()) + { + logger.debug("Not all nodes are upgraded to a version that supports Roles yet, rescheduling setup task"); + scheduleSetupTask(setupTask); + return; + } + + isClusterReady = true; + try + { + setupTask.call(); + } + catch (Exception e) + { + logger.info("Setup task failed with error, rescheduling"); + scheduleSetupTask(setupTask); + } + } + }, AuthKeyspace.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS); } /* @@ -370,7 +400,7 @@ public class CassandraRoleManager implements IRoleManager * the new system_auth.roles table. This setup is not performed if AllowAllAuthenticator * is configured (see Auth#setup). */ - private void convertLegacyData() + private void convertLegacyData() throws Exception { try { @@ -413,6 +443,7 @@ public class CassandraRoleManager implements IRoleManager logger.info("Unable to complete conversion of legacy auth data (perhaps not enough nodes are upgraded yet). " + "Conversion should not be considered complete"); logger.debug("Conversion error", e); + throw e; } } @@ -567,8 +598,18 @@ public class CassandraRoleManager implements IRoleManager return StringUtils.replace(name, "'", "''"); } - private static UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestExecutionException + /** + * Executes the provided query. + * This shouldn't be used during setup as this will directly return an error if the manager is not setup yet. Setup tasks + * should use QueryProcessor.process directly. + */ + private UntypedResultSet process(String query, ConsistencyLevel consistencyLevel) throws RequestValidationException, RequestExecutionException { + if (!isClusterReady) + throw new InvalidRequestException("Cannot process role related query as the role manager isn't yet setup. " + + "This is likely because some of nodes in the cluster are on version 2.1 or earlier. " + + "You need to upgrade all nodes to Cassandra 2.2 or more to use roles."); + return QueryProcessor.process(query, consistencyLevel); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b22ad421/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 944dced..1f3240d 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -93,6 +93,8 @@ public final class MessagingService implements MessagingServiceMBean */ public static final int PROTOCOL_MAGIC = 0xCA552DFA; + private boolean allNodesAtLeast22 = true; + /* All verb handler identifiers */ public enum Verb { @@ -819,20 +821,49 @@ public final class MessagingService implements MessagingServiceMBean return packed >>> (start + 1) - count & ~(-1 << count); } + public boolean areAllNodesAtLeast22() + { + return allNodesAtLeast22; + } + /** * @return the last version associated with address, or @param version if this is the first such version */ public int setVersion(InetAddress endpoint, int version) { logger.debug("Setting version {} for {}", version, endpoint); + + if (version < VERSION_22) + allNodesAtLeast22 = false; + Integer v = versions.put(endpoint, version); + + // if the version was increased to 2.2 or later, see if all nodes are >= 2.2 now + if (v != null && v < VERSION_22 && version >= VERSION_22) + refreshAllNodesAtLeast22(); + return v == null ? version : v; } public void resetVersion(InetAddress endpoint) { logger.debug("Resetting version for {}", endpoint); - versions.remove(endpoint); + Integer removed = versions.remove(endpoint); + if (removed != null && removed <= VERSION_22) + refreshAllNodesAtLeast22(); + } + + private void refreshAllNodesAtLeast22() + { + for (Integer version: versions.values()) + { + if (version < VERSION_22) + { + allNodesAtLeast22 = false; + return; + } + } + allNodesAtLeast22 = true; } public int getVersion(InetAddress endpoint)
