This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a960d87e05f01000a758d8e7e5a58be57d13eb33 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Mon May 18 23:02:51 2020 +0200 add schema coordinator Co-authored-by: Stefan Miklosovic <stefan.mikloso...@instaclustr.com> --- src/java/org/apache/cassandra/config/Schema.java | 8 + .../cassandra/service/MigrationCoordinator.java | 501 +++++++++++++++++++++ .../apache/cassandra/service/MigrationManager.java | 124 +---- .../apache/cassandra/service/MigrationTask.java | 116 ----- .../apache/cassandra/service/StorageService.java | 69 ++- .../cassandra/service/StorageServiceMBean.java | 4 + .../service/MigrationCoordinatorTest.java | 319 +++++++++++++ 7 files changed, 895 insertions(+), 246 deletions(-) diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index 6d91d8d..2d50b32 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -574,6 +574,14 @@ public class Schema } /** + * Checks whether the current schema is empty. + */ + public boolean isEmpty() + { + return emptyVersion.equals(version); + } + + /** * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest * will be converted into UUID which would act as content-based version of the schema. */ diff --git a/src/java/org/apache/cassandra/service/MigrationCoordinator.java b/src/java/org/apache/cassandra/service/MigrationCoordinator.java new file mode 100644 index 0000000..18ffdbb --- /dev/null +++ b/src/java/org/apache/cassandra/service/MigrationCoordinator.java @@ -0,0 +1,501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.net.InetAddress; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.LocalAwareExecutorService; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +public class MigrationCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class); + private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); + private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null); + + private static final int MIGRATION_DELAY_IN_MS = 60000; + private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3; + + public static final MigrationCoordinator instance = new MigrationCoordinator(); + + static class VersionInfo + { + final UUID version; + + final Set<InetAddress> endpoints = Sets.newConcurrentHashSet(); + final Set<InetAddress> outstandingRequests = Sets.newConcurrentHashSet(); + final Deque<InetAddress> requestQueue = new ArrayDeque<>(); + + private final WaitQueue waitQueue = new WaitQueue(); + + volatile boolean receivedSchema; + + VersionInfo(UUID version) + { + this.version = version; + } + + WaitQueue.Signal register() + { + return waitQueue.register(); + } + + void markReceived() + { + if (receivedSchema) + return; + + receivedSchema = true; + waitQueue.signalAll(); + } + + boolean wasReceived() + { + return receivedSchema; + } + } + + private final Map<UUID, VersionInfo> versionInfo = new HashMap<>(); + private final Map<InetAddress, UUID> endpointVersions = new HashMap<>(); + + public void start() + { + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES); + } + + public synchronized void reset() + { + versionInfo.clear(); + } + + synchronized List<Future<Void>> pullUnreceivedSchemaVersions() + { + List<Future<Void>> futures = new ArrayList<>(); + for (VersionInfo info : versionInfo.values()) + { + if (info.wasReceived() || info.outstandingRequests.size() > 0) + continue; + + Future<Void> future = maybePullSchema(info); + if (future != null && future != FINISHED_FUTURE) + futures.add(future); + } + + return futures; + } + + synchronized Future<Void> maybePullSchema(VersionInfo info) + { + if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version)) + return FINISHED_FUTURE; + + if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests()) + return FINISHED_FUTURE; + + for (int i=0, isize=info.requestQueue.size(); i<isize; i++) + { + InetAddress endpoint = info.requestQueue.remove(); + if (!info.endpoints.contains(endpoint)) + continue; + + if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint)) + { + return scheduleSchemaPull(endpoint, info); + } + else + { + // return to queue + info.requestQueue.offer(endpoint); + } + } + + // no suitable endpoints were found, check again in a minute, the periodic task will pick it up + return null; + } + + public synchronized Map<UUID, Set<InetAddress>> outstandingVersions() + { + HashMap<UUID, Set<InetAddress>> map = new HashMap<>(); + for (VersionInfo info : versionInfo.values()) + if (!info.wasReceived()) + map.put(info.version, ImmutableSet.copyOf(info.endpoints)); + return map; + } + + @VisibleForTesting + protected VersionInfo getVersionInfoUnsafe(UUID version) + { + return versionInfo.get(version); + } + + @VisibleForTesting + protected int getMaxOutstandingVersionRequests() + { + return MAX_OUTSTANDING_VERSION_REQUESTS; + } + + @VisibleForTesting + protected boolean isAlive(InetAddress endpoint) + { + return FailureDetector.instance.isAlive(endpoint); + } + + @VisibleForTesting + protected boolean shouldPullSchema(UUID version) + { + if (Schema.instance.getVersion() == null) + { + logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version); + return false; + } + + if (Schema.instance.getVersion().equals(version)) + { + logger.debug("Not pulling schema for version {}, because schema versions match: " + + "local={}, remote={}", + version, Schema.instance.getVersion(), + version); + return false; + } + return true; + } + + // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes + // from both 3.0 and 3.0.14. + private static boolean is30Compatible(int version) + { + return version == MessagingService.current_version || version == MessagingService.VERSION_3014; + } + + @VisibleForTesting + protected boolean shouldPullFromEndpoint(InetAddress endpoint) + { + if (endpoint.equals(FBUtilities.getBroadcastAddress())) + return false; + + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + return false; + + final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value; + final String ourMajorVersion = FBUtilities.getReleaseVersionMajor(); + + if (!releaseVersion.startsWith(ourMajorVersion)) + { + logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}", + endpoint, ourMajorVersion, releaseVersion); + return false; + } + + if (!MessagingService.instance().knowsVersion(endpoint)) + { + logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint); + return false; + } + + if (!is30Compatible(MessagingService.instance().getRawVersion(endpoint))) + { + logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint); + return false; + } + + if (Gossiper.instance.isGossipOnlyMember(endpoint)) + { + logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint); + return false; + } + return true; + } + + @VisibleForTesting + protected boolean shouldPullImmediately(InetAddress endpoint, UUID version) + { + if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) + { + // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately + logger.debug("Immediately submitting migration task for {}, " + + "schema versions: local={}, remote={}", + endpoint, + Schema.instance.getVersion(), + version); + return true; + } + return false; + } + + @VisibleForTesting + protected boolean isLocalVersion(UUID version) + { + return Schema.instance.getVersion().equals(version); + } + + /** + * If a previous schema update brought our version the same as the incoming schema, don't apply it + */ + synchronized boolean shouldApplySchemaFor(VersionInfo info) + { + if (info.wasReceived()) + return false; + return !isLocalVersion(info.version); + } + + synchronized Future<Void> reportEndpointVersion(InetAddress endpoint, UUID version) + { + UUID current = endpointVersions.put(endpoint, version); + if (current != null && current.equals(version)) + return FINISHED_FUTURE; + + VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new); + if (isLocalVersion(version)) + info.markReceived(); + info.endpoints.add(endpoint); + info.requestQueue.addFirst(endpoint); + + // disassociate this endpoint from its (now) previous schema version + removeEndpointFromVersion(endpoint, current); + + return maybePullSchema(info); + } + + Future<Void> reportEndpointVersion(InetAddress endpoint, EndpointState state) + { + if (state == null) + return FINISHED_FUTURE; + + VersionedValue version = state.getApplicationState(ApplicationState.SCHEMA); + + if (version == null) + return FINISHED_FUTURE; + + return reportEndpointVersion(endpoint, UUID.fromString(version.value)); + } + + private synchronized void removeEndpointFromVersion(InetAddress endpoint, UUID version) + { + if (version == null) + return; + + VersionInfo info = versionInfo.get(version); + + if (info == null) + return; + + info.endpoints.remove(endpoint); + if (info.endpoints.isEmpty()) + { + info.waitQueue.signalAll(); + versionInfo.remove(version); + } + } + + Future<Void> scheduleSchemaPull(InetAddress endpoint, VersionInfo info) + { + LocalAwareExecutorService stage = StageManager.getStage(Stage.MIGRATION); + FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null); + if (shouldPullImmediately(endpoint, info.version)) + { + stage.submit(task); + } + else + { + ScheduledExecutors.nonPeriodicTasks.schedule(() -> stage.submit(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); + } + return task; + } + + @VisibleForTesting + protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations) + { + SchemaKeyspace.mergeSchemaAndAnnounceVersion(mutations); + } + + class Callback implements IAsyncCallbackWithFailure<Collection<Mutation>> + { + final InetAddress endpoint; + final VersionInfo info; + + public Callback(InetAddress endpoint, VersionInfo info) + { + this.endpoint = endpoint; + this.info = info; + } + + public void onFailure(InetAddress from) + { + fail(); + } + + Future<Void> fail() + { + return pullComplete(endpoint, info, false); + } + + public void response(MessageIn<Collection<Mutation>> message) + { + response(message.payload); + } + + Future<Void> response(Collection<Mutation> mutations) + { + synchronized (info) + { + if (shouldApplySchemaFor(info)) + { + try + { + mergeSchemaFrom(endpoint, mutations); + } + catch (Exception e) + { + logger.error(String.format("Unable to merge schema from %s", endpoint), e); + return fail(); + } + } + return pullComplete(endpoint, info, true); + } + } + + public boolean isLatencyForSnitch() + { + return false; + } + } + + private void pullSchema(Callback callback) + { + if (!isAlive(callback.endpoint)) + { + logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint); + callback.fail(); + return; + } + + // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), + // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with + // a higher major. + if (!shouldPullFromEndpoint(callback.endpoint)) + { + logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint); + callback.fail(); + return; + } + + logger.debug("Requesting schema from {}", callback.endpoint); + sendMigrationMessage(callback); + } + + protected void sendMigrationMessage(Callback callback) + { + MessageOut<?> message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); + logger.info("Sending schema pull request to {} at {} with timeout {}", callback.endpoint, System.currentTimeMillis(), message.getTimeout()); + MessagingService.instance().sendRR(message, callback.endpoint, callback, message.getTimeout(), true); + } + + private synchronized Future<Void> pullComplete(InetAddress endpoint, VersionInfo info, boolean wasSuccessful) + { + if (wasSuccessful) + info.markReceived(); + + info.outstandingRequests.remove(endpoint); + info.requestQueue.add(endpoint); + return maybePullSchema(info); + } + + /** + * Wait until we've received schema responses for all versions we're aware of + * @param waitMillis + * @return true if response for all schemas were received, false if we timed out waiting + */ + public boolean awaitSchemaRequests(long waitMillis) + { + if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress())) + CassandraDaemon.waitForGossipToSettle(); + + WaitQueue.Signal signal = null; + try + { + synchronized (this) + { + List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size()); + for (VersionInfo version : versionInfo.values()) + { + if (version.wasReceived()) + continue; + + signalList.add(version.register()); + } + + if (signalList.isEmpty()) + return true; + + WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()]; + signalList.toArray(signals); + signal = WaitQueue.all(signals); + } + + return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + finally + { + if (signal != null) + signal.cancel(); + } + } +} diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 26b1aed..2b0c834 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -21,12 +21,9 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.*; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; @@ -56,12 +53,6 @@ public class MigrationManager public static final MigrationManager instance = new MigrationManager(); - private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); - - public static final int MIGRATION_DELAY_IN_MS = 60000; - - private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1")); - private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>(); private MigrationManager() {} @@ -76,86 +67,6 @@ public class MigrationManager listeners.remove(listener); } - public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state) - { - VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA); - - if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null) - maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value); - } - - /** - * If versions differ this node sends request with local migration list to the endpoint - * and expecting to receive a list of migrations to apply locally. - */ - private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint, String releaseVersion) - { - String ourMajorVersion = FBUtilities.getReleaseVersionMajor(); - if (!releaseVersion.startsWith(ourMajorVersion)) - { - logger.debug("Not pulling schema because release version in Gossip is not major version {}, it is {}", ourMajorVersion, releaseVersion); - return; - } - - if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint)) - { - logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); - return; - } - - if (Schema.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) - { - // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately - logger.debug("Submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); - } - else - { - // Include a delay to make sure we have a chance to apply any changes being - // pushed out simultaneously. See CASSANDRA-5025 - Runnable runnable = () -> - { - // grab the latest version of the schema since it may have changed again since the initial scheduling - EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - if (epState == null) - { - logger.debug("epState vanished for {}, not submitting migration task", endpoint); - return; - } - VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA); - UUID currentVersion = UUID.fromString(value.value); - if (Schema.instance.getVersion().equals(currentVersion)) - { - logger.debug("not submitting migration task for {} because our versions match", endpoint); - return; - } - logger.debug("submitting migration task for {}", endpoint); - submitMigrationTask(endpoint); - }; - ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); - } - } - - private static Future<?> submitMigrationTask(InetAddress endpoint) - { - /* - * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are - * running in the gossip stage. - */ - return StageManager.getStage(Stage.MIGRATION).submit(new MigrationTask(endpoint)); - } - - public static boolean shouldPullSchemaFrom(InetAddress endpoint) - { - /* - * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) - * Don't request schema from fat clients - */ - return MessagingService.instance().knowsVersion(endpoint) - && is30Compatible(MessagingService.instance().getRawVersion(endpoint)) - && !Gossiper.instance.isGossipOnlyMember(endpoint); - } - // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes // from both 3.0 and 3.0.14. private static boolean is30Compatible(int version) @@ -163,29 +74,6 @@ public class MigrationManager return version == MessagingService.current_version || version == MessagingService.VERSION_3014; } - public static boolean isReadyForBootstrap() - { - return MigrationTask.getInflightTasks().isEmpty(); - } - - public static void waitUntilReadyForBootstrap() - { - CountDownLatch completionLatch; - while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null) - { - try - { - if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) - logger.error("Migration task failed to complete"); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - logger.error("Migration task was interrupted"); - } - } - } - public void notifyCreateKeyspace(KeyspaceMetadata ksm) { for (MigrationListener listener : listeners) @@ -610,15 +498,15 @@ public class MigrationManager Set<InetAddress> liveEndpoints = Gossiper.instance.getLiveMembers(); liveEndpoints.remove(FBUtilities.getBroadcastAddress()); + MigrationCoordinator.instance.reset(); + // force migration if there are nodes around for (InetAddress node : liveEndpoints) { - if (shouldPullSchemaFrom(node)) - { - logger.debug("Requesting schema from {}", node); - FBUtilities.waitOnFuture(submitMigrationTask(node)); - break; - } + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(node); + Future<Void> pull = MigrationCoordinator.instance.reportEndpointVersion(node, state); + if (pull != null) + FBUtilities.waitOnFuture(pull); } logger.info("Local schema reset is complete."); diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java deleted file mode 100644 index 6b04756..0000000 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service; - -import java.io.IOException; -import java.net.InetAddress; -import java.util.Collection; -import java.util.EnumSet; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.SystemKeyspace.BootstrapState; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.net.IAsyncCallback; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.utils.WrappedRunnable; - - -class MigrationTask extends WrappedRunnable -{ - private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); - - private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>(); - - private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); - - private final InetAddress endpoint; - - MigrationTask(InetAddress endpoint) - { - this.endpoint = endpoint; - } - - public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks() - { - return inflightTasks; - } - - public void runMayThrow() throws Exception - { - if (!FailureDetector.instance.isAlive(endpoint)) - { - logger.warn("Can't send schema pull request: node {} is down.", endpoint); - return; - } - - // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), - // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with - // a higher major. - if (!MigrationManager.shouldPullSchemaFrom(endpoint)) - { - logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint); - return; - } - - MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); - - final CountDownLatch completionLatch = new CountDownLatch(1); - - IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>() - { - @Override - public void response(MessageIn<Collection<Mutation>> message) - { - try - { - SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload); - } - catch (ConfigurationException e) - { - logger.error("Configuration exception merging remote schema", e); - } - finally - { - completionLatch.countDown(); - } - } - - public boolean isLatencyForSnitch() - { - return false; - } - }; - - // Only save the latches if we need bootstrap or are bootstrapping - if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState())) - inflightTasks.offer(completionLatch); - - MessagingService.instance().sendRR(message, endpoint, cb); - } -} diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c4309f8..3718e8c 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.management.*; import javax.management.openmbean.TabularData; @@ -112,6 +113,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private static final Logger logger = LoggerFactory.getLogger(StorageService.class); public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized + public static final int SCHEMA_DELAY = getRingDelay(); // delay after which we assume ring has stablized + + private static final boolean REQUIRE_SCHEMAS = !Boolean.getBoolean("cassandra.skip_schema_check"); private final JMXProgressSupport progressSupport = new JMXProgressSupport(this); @@ -134,6 +138,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return 30 * 1000; } + private static int getSchemaDelay() + { + String newdelay = System.getProperty("cassandra.schema_delay_ms"); + if (newdelay != null) + { + logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay); + return Integer.parseInt(newdelay); + } + else + { + return 30 * 1000; + } + } + /* This abstraction maintains the token/endpoint metadata information */ private TokenMetadata tokenMetadata = new TokenMetadata(); @@ -764,6 +782,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE @VisibleForTesting public void prepareToJoin() throws ConfigurationException { + MigrationCoordinator.instance.start(); if (!joined) { Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class); @@ -840,6 +859,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } + public void waitForSchema(int delay) + { + // first sleep the delay to make sure we see all our peers + for (long i = 0; i < delay; i += 1000) + { + // if we see schema, we can proceed to the next check directly + if (!Schema.instance.isEmpty()) + { + logger.debug("current schema version: {}", Schema.instance.getVersion()); + break; + } + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + boolean schemasReceived = MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(SCHEMA_DELAY)); + + if (schemasReceived) + return; + + logger.warn(String.format("There are nodes in the cluster with a different schema version than us we did not merged schemas from, " + + "our version : (%s), outstanding versions -> endpoints : %s", + Schema.instance.getVersion(), + MigrationCoordinator.instance.outstandingVersions())); + + if (REQUIRE_SCHEMAS) + throw new RuntimeException("Didn't receive schemas for all known versions within the timeout"); + } + @VisibleForTesting public void joinTokenRing(int delay) throws ConfigurationException { @@ -887,14 +934,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } - // if our schema hasn't matched yet, wait until it has - // we do this by waiting for all in-flight migration requests and responses to complete - // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) - if (!MigrationManager.isReadyForBootstrap()) - { - setMode(Mode.JOINING, "waiting for schema information to complete", true); - MigrationManager.waitUntilReadyForBootstrap(); - } + waitForSchema(delay); setMode(Mode.JOINING, "schema complete, ready to bootstrap", true); setMode(Mode.JOINING, "waiting for pending range calculation", true); PendingRangeCalculatorService.instance.blockUntilFinished(); @@ -1828,7 +1868,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE break; case SCHEMA: SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor); - MigrationManager.instance.scheduleSchemaPull(endpoint, epState); + MigrationCoordinator.instance.reportEndpointVersion(endpoint, UUID.fromString(value.value)); break; case HOST_ID: SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor); @@ -2589,13 +2629,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { onChange(endpoint, entry.getKey(), entry.getValue()); } - MigrationManager.instance.scheduleSchemaPull(endpoint, epState); + MigrationCoordinator.instance.reportEndpointVersion(endpoint, epState); } public void onAlive(InetAddress endpoint, EndpointState state) { - MigrationManager.instance.scheduleSchemaPull(endpoint, state); - if (tokenMetadata.isMember(endpoint)) notifyUp(endpoint); } @@ -4901,4 +4939,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE Runtime.getRuntime().removeShutdownHook(drainOnShutdown); } } + + @Override + public Map<String, Set<InetAddress>> getOutstandingSchemaVersions() + { + Map<UUID, Set<InetAddress>> outstanding = MigrationCoordinator.instance.outstandingVersions(); + return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Entry::getValue)); + } } diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 1afa48e..dd534cb 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -634,4 +635,7 @@ public interface StorageServiceMBean extends NotificationEmitter /** Returns the max version that this node will negotiate for native protocol connections */ public int getMaxNativeProtocolVersion(); + + /** Returns a map of schema version -> list of endpoints reporting that version that we need schema updates for */ + public Map<String, Set<InetAddress>> getOutstandingSchemaVersions(); } diff --git a/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java b/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java new file mode 100644 index 0000000..070537d --- /dev/null +++ b/test/unit/org/apache/cassandra/service/MigrationCoordinatorTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Future; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import org.junit.Assert; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +import static com.google.common.util.concurrent.Futures.getUnchecked; + +public class MigrationCoordinatorTest +{ + private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class); + + private static final InetAddress EP1; + private static final InetAddress EP2; + private static final InetAddress EP3; + + private static final UUID LOCAL_VERSION = UUID.randomUUID(); + private static final UUID V1 = UUID.randomUUID(); + private static final UUID V2 = UUID.randomUUID(); + private static final UUID V3 = UUID.randomUUID(); + + static + { + try + { + EP1 = InetAddress.getByName("10.0.0.1"); + EP2 = InetAddress.getByName("10.0.0.2"); + EP3 = InetAddress.getByName("10.0.0.3"); + } + catch (UnknownHostException e) + { + throw new AssertionError(e); + } + + DatabaseDescriptor.forceStaticInitialization(); + } + + private static class InstrumentedCoordinator extends MigrationCoordinator + { + + Queue<Callback> requests = new LinkedList<>(); + @Override + protected void sendMigrationMessage(MigrationCoordinator.Callback callback) + { + requests.add(callback); + } + + boolean shouldPullSchema = true; + @Override + protected boolean shouldPullSchema(UUID version) + { + return shouldPullSchema; + } + + boolean shouldPullFromEndpoint = true; + @Override + protected boolean shouldPullFromEndpoint(InetAddress endpoint) + { + return shouldPullFromEndpoint; + } + + boolean shouldPullImmediately = true; + @Override + protected boolean shouldPullImmediately(InetAddress endpoint, UUID version) + { + return shouldPullImmediately; + } + + Set<InetAddress> deadNodes = new HashSet<>(); + protected boolean isAlive(InetAddress endpoint) + { + return !deadNodes.contains(endpoint); + } + + UUID localVersion = LOCAL_VERSION; + @Override + protected boolean isLocalVersion(UUID version) + { + return localVersion.equals(version); + } + + int maxOutstandingRequests = 3; + @Override + protected int getMaxOutstandingVersionRequests() + { + return maxOutstandingRequests; + } + + Set<InetAddress> mergedSchemasFrom = new HashSet<>(); + @Override + protected void mergeSchemaFrom(InetAddress endpoint, Collection<Mutation> mutations) + { + mergedSchemasFrom.add(endpoint); + } + } + + @Test + public void requestResponseCycle() throws InterruptedException + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); + coordinator.maxOutstandingRequests = 1; + + Assert.assertTrue(coordinator.requests.isEmpty()); + + // first schema report should send a migration request + getUnchecked(coordinator.reportEndpointVersion(EP1, V1)); + Assert.assertEquals(1, coordinator.requests.size()); + Assert.assertFalse(coordinator.awaitSchemaRequests(1)); + + // second should not + getUnchecked(coordinator.reportEndpointVersion(EP2, V1)); + Assert.assertEquals(1, coordinator.requests.size()); + Assert.assertFalse(coordinator.awaitSchemaRequests(1)); + + // until the first request fails, then the second endpoint should be contacted + MigrationCoordinator.Callback request1 = coordinator.requests.poll(); + Assert.assertEquals(EP1, request1.endpoint); + getUnchecked(request1.fail()); + Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty()); + Assert.assertFalse(coordinator.awaitSchemaRequests(1)); + + // ... then the second endpoint should be contacted + Assert.assertEquals(1, coordinator.requests.size()); + MigrationCoordinator.Callback request2 = coordinator.requests.poll(); + Assert.assertEquals(EP2, request2.endpoint); + Assert.assertFalse(coordinator.awaitSchemaRequests(1)); + getUnchecked(request2.response(Collections.emptyList())); + Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom)); + Assert.assertTrue(coordinator.awaitSchemaRequests(1)); + + // and migration tasks should not be sent out for subsequent version reports + getUnchecked(coordinator.reportEndpointVersion(EP3, V1)); + Assert.assertTrue(coordinator.requests.isEmpty()); + + } + + /** + * If we don't send a request for a version, and endpoints associated with + * it all change versions, we should signal anyone waiting on that version + */ + @Test + public void versionsAreSignaledWhenDeleted() + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); + + coordinator.reportEndpointVersion(EP1, V1); + WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register(); + Assert.assertFalse(signal.isSignalled()); + + coordinator.reportEndpointVersion(EP1, V2); + Assert.assertNull(coordinator.getVersionInfoUnsafe(V1)); + + Assert.assertTrue(signal.isSignalled()); + } + + private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddress endpoint, UUID version, boolean startupShouldBeUnblocked) + { + Assert.assertTrue(coordinator.requests.isEmpty()); + Future<Void> future = coordinator.reportEndpointVersion(EP1, V1); + if (future != null) + getUnchecked(future); + Assert.assertTrue(coordinator.requests.isEmpty()); + + Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1)); + } + + private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked) + { + assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked); + } + + @Test + public void dontContactNodesWithSameSchema() + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); + + coordinator.localVersion = V1; + assertNoContact(coordinator, true); + } + + @Test + public void dontContactIncompatibleNodes() + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); + + coordinator.shouldPullFromEndpoint = false; + assertNoContact(coordinator, false); + } + + @Test + public void dontContactDeadNodes() + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); + + coordinator.deadNodes.add(EP1); + assertNoContact(coordinator, EP1, V1, false); + } + + /** + * If a node has become incompativle between when the task was scheduled and when it + * was run, we should detect that and fail the task + */ + @Test + public void testGossipRace() + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator() { + protected boolean shouldPullImmediately(InetAddress endpoint, UUID version) + { + // this is the last thing that gets called before scheduling the pull, so set this flag here + shouldPullFromEndpoint = false; + return super.shouldPullImmediately(endpoint, version); + } + }; + + Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1)); + assertNoContact(coordinator, EP1, V1, false); + } + + @Test + public void testWeKeepSendingRequests() + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); + + getUnchecked(coordinator.reportEndpointVersion(EP3, V2)); + coordinator.requests.remove().response(Collections.emptyList()); + + getUnchecked(coordinator.reportEndpointVersion(EP1, V1)); + getUnchecked(coordinator.reportEndpointVersion(EP2, V1)); + + MigrationCoordinator.Callback prev = null; + Set<InetAddress> EPs = Sets.newHashSet(EP1, EP2); + int ep1requests = 0; + int ep2requests = 0; + + for (int i=0; i<10; i++) + { + Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size()); + + MigrationCoordinator.Callback next = coordinator.requests.remove(); + + // we should be contacting endpoints in a round robin fashion + Assert.assertTrue(EPs.contains(next.endpoint)); + if (prev != null && prev.endpoint.equals(next.endpoint)) + Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint)); + + // should send a new request + next.fail(); + prev = next; + Assert.assertFalse(coordinator.awaitSchemaRequests(1)); + + Assert.assertEquals(2, coordinator.requests.size()); + } + logger.info("{} -> {}", EP1, ep1requests); + logger.info("{} -> {}", EP2, ep2requests); + + // a single success should unblock startup though + coordinator.requests.remove().response(Collections.emptyList()); + Assert.assertTrue(coordinator.awaitSchemaRequests(1)); + + } + + /** + * Pull unreceived schemas should detect and send requests out for any + * schemas that are marked unreceived and have no outstanding requests + */ + @Test + public void pullUnreceived() + { + InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); + + coordinator.shouldPullFromEndpoint = false; + assertNoContact(coordinator, false); + + coordinator.shouldPullFromEndpoint = true; + Assert.assertEquals(0, coordinator.requests.size()); + List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions(); + futures.forEach(Futures::getUnchecked); + Assert.assertEquals(1, coordinator.requests.size()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org