This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 159e021aa36a44cdc7674dd234d4a6e189478280 Merge: 85f69cf fe70155 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Mon Nov 9 12:22:43 2020 -0800 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + .../cassandra/schema/MigrationCoordinator.java | 516 +++++++++++++++++++++ .../apache/cassandra/schema/MigrationManager.java | 149 +----- .../org/apache/cassandra/schema/MigrationTask.java | 111 ----- src/java/org/apache/cassandra/schema/Schema.java | 2 +- .../cassandra/schema/SchemaMigrationEvent.java | 7 +- .../apache/cassandra/service/StorageService.java | 63 ++- .../cassandra/service/StorageServiceMBean.java | 7 + .../cassandra/distributed/action/GossipHelper.java | 10 +- .../cassandra/schema/MigrationCoordinatorTest.java | 319 +++++++++++++ 10 files changed, 908 insertions(+), 277 deletions(-) diff --cc CHANGES.txt index f90ffd4,b81b0c8..1b08837 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,41 -1,10 +1,42 @@@ -3.11.10 +4.0-beta4 + * Add a ratelimiter to snapshot creation and deletion (CASSANDRA-13019) + * Produce consistent tombstone for reads to avoid digest mistmatch (CASSANDRA-15369) + * Fix SSTableloader issue when restoring a table named backups (CASSANDRA-16235) + * Invalid serialized size for responses caused by increasing message time by 1ms which caused extra bytes in size calculation (CASSANDRA-16103) + * Throw BufferOverflowException from DataOutputBuffer for better visibility (CASSANDRA-16214) + * TLS connections to the storage port on a node without server encryption configured causes java.io.IOException accessing missing keystore (CASSANDRA-16144) +Merged from 3.11: Merged from 3.0: - * Fix invalid cell value skipping when reading from disk (CASSANDRA-16223) ++ * Wait for schema agreement when bootstrapping (CASSANDRA-15158) * Prevent invoking enable/disable gossip when not in NORMAL (CASSANDRA-16146) -3.11.9 - * Synchronize Keyspace instance store/clear (CASSANDRA-16210) +4.0-beta3 + * Segregate Network and Chunk Cache BufferPools and Recirculate Partially Freed Chunks (CASSANDRA-15229) + * Fail truncation requests when they fail on a replica (CASSANDRA-16208) + * Move compact storage validation earlier in startup process (CASSANDRA-16063) + * Fix ByteBufferAccessor cast exceptions are thrown when trying to query a virtual table (CASSANDRA-16155) + * Consolidate node liveness check for forced repair (CASSANDRA-16113) + * Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147) + * Abort repairs when getting a truncation request (CASSANDRA-15854) + * Remove bad assert when getting active compactions for an sstable (CASSANDRA-15457) + * Avoid failing compactions with very large partitions (CASSANDRA-15164) + * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131) + * Avoid invalid state transition exception during incremental repair (CASSANDRA-16067) + * Allow zero padding in timestamp serialization (CASSANDRA-16105) + * Add byte array backed cells (CASSANDRA-15393) + * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801) + * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099) + * Add nodetool getfullquerylog (CASSANDRA-15988) + * Fix yaml format and alignment in tpstats (CASSANDRA-11402) + * Avoid trying to keep track of RTs for endpoints we won't write to during read repair (CASSANDRA-16084) + * When compaction gets interrupted, the exception should include the compactionId (CASSANDRA-15954) + * Make Table/Keyspace Metric Names Consistent With Each Other (CASSANDRA-15909) + * Mutating sstable component may race with entire-sstable-streaming(ZCS) causing checksum validation failure (CASSANDRA-15861) + * NPE thrown while updating speculative execution time if keyspace is removed during task execution (CASSANDRA-15949) + * Show the progress of data streaming and index build (CASSANDRA-15406) + * Add flag to disable chunk cache and disable by default (CASSANDRA-16036) + * Upgrade to snakeyaml >= 1.26 version for CVE-2017-18640 fix (CASSANDRA-16150) +Merged from 3.11: * Fix ColumnFilter to avoid querying cells of unselected complex columns (CASSANDRA-15977) * Fix memory leak in CompressedChunkReader (CASSANDRA-15880) * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833) diff --cc src/java/org/apache/cassandra/schema/MigrationCoordinator.java index 0000000,0000000..9fb348e new file mode 100644 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java @@@ -1,0 -1,0 +1,516 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.schema; ++ ++import java.lang.management.ManagementFactory; ++import java.lang.management.RuntimeMXBean; ++import java.util.ArrayDeque; ++import java.util.ArrayList; ++import java.util.Collection; ++import java.util.Deque; ++import java.util.HashMap; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++import java.util.UUID; ++import java.util.concurrent.Future; ++import java.util.concurrent.FutureTask; ++import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicInteger; ++import java.util.function.LongSupplier; ++ ++import com.google.common.annotations.VisibleForTesting; ++import com.google.common.collect.ImmutableSet; ++import com.google.common.collect.Sets; ++import com.google.common.util.concurrent.Futures; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import org.apache.cassandra.concurrent.ScheduledExecutors; ++import org.apache.cassandra.concurrent.Stage; ++import org.apache.cassandra.db.Mutation; ++import org.apache.cassandra.exceptions.RequestFailureReason; ++import org.apache.cassandra.gms.ApplicationState; ++import org.apache.cassandra.gms.EndpointState; ++import org.apache.cassandra.gms.FailureDetector; ++import org.apache.cassandra.gms.Gossiper; ++import org.apache.cassandra.locator.InetAddressAndPort; ++import org.apache.cassandra.net.Message; ++import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.net.NoPayload; ++import org.apache.cassandra.net.RequestCallback; ++import org.apache.cassandra.net.Verb; ++import org.apache.cassandra.utils.FBUtilities; ++import org.apache.cassandra.utils.concurrent.WaitQueue; ++ ++public class MigrationCoordinator ++{ ++ private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinator.class); ++ private static final Future<Void> FINISHED_FUTURE = Futures.immediateFuture(null); ++ ++ private static LongSupplier getUptimeFn = () -> ManagementFactory.getRuntimeMXBean().getUptime(); ++ ++ @VisibleForTesting ++ public static void setUptimeFn(LongSupplier supplier) ++ { ++ getUptimeFn = supplier; ++ } ++ ++ ++ private static final int MIGRATION_DELAY_IN_MS = 60000; ++ private static final int MAX_OUTSTANDING_VERSION_REQUESTS = 3; ++ ++ public static final MigrationCoordinator instance = new MigrationCoordinator(); ++ ++ static class VersionInfo ++ { ++ final UUID version; ++ ++ final Set<InetAddressAndPort> endpoints = Sets.newConcurrentHashSet(); ++ final Set<InetAddressAndPort> outstandingRequests = Sets.newConcurrentHashSet(); ++ final Deque<InetAddressAndPort> requestQueue = new ArrayDeque<>(); ++ ++ private final WaitQueue waitQueue = new WaitQueue(); ++ ++ volatile boolean receivedSchema; ++ ++ VersionInfo(UUID version) ++ { ++ this.version = version; ++ } ++ ++ WaitQueue.Signal register() ++ { ++ return waitQueue.register(); ++ } ++ ++ void markReceived() ++ { ++ if (receivedSchema) ++ return; ++ ++ receivedSchema = true; ++ waitQueue.signalAll(); ++ } ++ ++ boolean wasReceived() ++ { ++ return receivedSchema; ++ } ++ } ++ ++ private final Map<UUID, VersionInfo> versionInfo = new HashMap<>(); ++ private final Map<InetAddressAndPort, UUID> endpointVersions = new HashMap<>(); ++ private final AtomicInteger inflightTasks = new AtomicInteger(); ++ ++ public void start() ++ { ++ ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::pullUnreceivedSchemaVersions, 1, 1, TimeUnit.MINUTES); ++ } ++ ++ public synchronized void reset() ++ { ++ versionInfo.clear(); ++ } ++ ++ synchronized List<Future<Void>> pullUnreceivedSchemaVersions() ++ { ++ List<Future<Void>> futures = new ArrayList<>(); ++ for (VersionInfo info : versionInfo.values()) ++ { ++ if (info.wasReceived() || info.outstandingRequests.size() > 0) ++ continue; ++ ++ Future<Void> future = maybePullSchema(info); ++ if (future != null && future != FINISHED_FUTURE) ++ futures.add(future); ++ } ++ ++ return futures; ++ } ++ ++ synchronized Future<Void> maybePullSchema(VersionInfo info) ++ { ++ if (info.endpoints.isEmpty() || info.wasReceived() || !shouldPullSchema(info.version)) ++ return FINISHED_FUTURE; ++ ++ if (info.outstandingRequests.size() >= getMaxOutstandingVersionRequests()) ++ return FINISHED_FUTURE; ++ ++ for (int i=0, isize=info.requestQueue.size(); i<isize; i++) ++ { ++ InetAddressAndPort endpoint = info.requestQueue.remove(); ++ if (!info.endpoints.contains(endpoint)) ++ continue; ++ ++ if (shouldPullFromEndpoint(endpoint) && info.outstandingRequests.add(endpoint)) ++ { ++ return scheduleSchemaPull(endpoint, info); ++ } ++ else ++ { ++ // return to queue ++ info.requestQueue.offer(endpoint); ++ } ++ } ++ ++ // no suitable endpoints were found, check again in a minute, the periodic task will pick it up ++ return null; ++ } ++ ++ public synchronized Map<UUID, Set<InetAddressAndPort>> outstandingVersions() ++ { ++ HashMap<UUID, Set<InetAddressAndPort>> map = new HashMap<>(); ++ for (VersionInfo info : versionInfo.values()) ++ if (!info.wasReceived()) ++ map.put(info.version, ImmutableSet.copyOf(info.endpoints)); ++ return map; ++ } ++ ++ @VisibleForTesting ++ protected VersionInfo getVersionInfoUnsafe(UUID version) ++ { ++ return versionInfo.get(version); ++ } ++ ++ @VisibleForTesting ++ protected int getMaxOutstandingVersionRequests() ++ { ++ return MAX_OUTSTANDING_VERSION_REQUESTS; ++ } ++ ++ @VisibleForTesting ++ protected boolean isAlive(InetAddressAndPort endpoint) ++ { ++ return FailureDetector.instance.isAlive(endpoint); ++ } ++ ++ @VisibleForTesting ++ protected boolean shouldPullSchema(UUID version) ++ { ++ if (Schema.instance.getVersion() == null) ++ { ++ logger.debug("Not pulling schema for version {}, because local schama version is not known yet", version); ++ return false; ++ } ++ ++ if (Schema.instance.isSameVersion(version)) ++ { ++ logger.debug("Not pulling schema for version {}, because schema versions match: " + ++ "local={}, remote={}", ++ version, ++ Schema.schemaVersionToString(Schema.instance.getVersion()), ++ Schema.schemaVersionToString(version)); ++ return false; ++ } ++ return true; ++ } ++ ++ // Since 3.0.14 protocol contains only a CASSANDRA-13004 bugfix, it is safe to accept schema changes ++ // from both 3.0 and 3.0.14. ++ private static boolean is30Compatible(int version) ++ { ++ return version == MessagingService.current_version || version == MessagingService.VERSION_3014; ++ } ++ ++ @VisibleForTesting ++ protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint) ++ { ++ if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort())) ++ return false; ++ ++ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); ++ if (state == null) ++ return false; ++ ++ final String releaseVersion = state.getApplicationState(ApplicationState.RELEASE_VERSION).value; ++ final String ourMajorVersion = FBUtilities.getReleaseVersionMajor(); ++ ++ if (!releaseVersion.startsWith(ourMajorVersion)) ++ { ++ logger.debug("Not pulling schema from {} because release version in Gossip is not major version {}, it is {}", ++ endpoint, ourMajorVersion, releaseVersion); ++ return false; ++ } ++ ++ if (!MessagingService.instance().versions.knows(endpoint)) ++ { ++ logger.debug("Not pulling schema from {} because their messaging version is unknown", endpoint); ++ return false; ++ } ++ ++ if (MessagingService.instance().versions.getRaw(endpoint) != MessagingService.current_version) ++ { ++ logger.debug("Not pulling schema from {} because their schema format is incompatible", endpoint); ++ return false; ++ } ++ ++ if (Gossiper.instance.isGossipOnlyMember(endpoint)) ++ { ++ logger.debug("Not pulling schema from {} because it's a gossip only member", endpoint); ++ return false; ++ } ++ return true; ++ } ++ ++ @VisibleForTesting ++ protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version) ++ { ++ if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS) ++ { ++ // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately ++ logger.debug("Immediately submitting migration task for {}, " + ++ "schema versions: local={}, remote={}", ++ endpoint, ++ Schema.schemaVersionToString(Schema.instance.getVersion()), ++ Schema.schemaVersionToString(version)); ++ return true; ++ } ++ return false; ++ } ++ ++ @VisibleForTesting ++ protected boolean isLocalVersion(UUID version) ++ { ++ return Schema.instance.isSameVersion(version); ++ } ++ ++ /** ++ * If a previous schema update brought our version the same as the incoming schema, don't apply it ++ */ ++ synchronized boolean shouldApplySchemaFor(VersionInfo info) ++ { ++ if (info.wasReceived()) ++ return false; ++ return !isLocalVersion(info.version); ++ } ++ ++ public synchronized Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, UUID version) ++ { ++ UUID current = endpointVersions.put(endpoint, version); ++ if (current != null && current.equals(version)) ++ return FINISHED_FUTURE; ++ ++ VersionInfo info = versionInfo.computeIfAbsent(version, VersionInfo::new); ++ if (isLocalVersion(version)) ++ info.markReceived(); ++ info.endpoints.add(endpoint); ++ info.requestQueue.addFirst(endpoint); ++ ++ // disassociate this endpoint from its (now) previous schema version ++ removeEndpointFromVersion(endpoint, current); ++ ++ return maybePullSchema(info); ++ } ++ ++ public Future<Void> reportEndpointVersion(InetAddressAndPort endpoint, EndpointState state) ++ { ++ if (state == null) ++ return FINISHED_FUTURE; ++ ++ UUID version = state.getSchemaVersion(); ++ ++ if (version == null) ++ return FINISHED_FUTURE; ++ ++ return reportEndpointVersion(endpoint, version); ++ } ++ ++ private synchronized void removeEndpointFromVersion(InetAddressAndPort endpoint, UUID version) ++ { ++ if (version == null) ++ return; ++ ++ VersionInfo info = versionInfo.get(version); ++ ++ if (info == null) ++ return; ++ ++ info.endpoints.remove(endpoint); ++ if (info.endpoints.isEmpty()) ++ { ++ info.waitQueue.signalAll(); ++ versionInfo.remove(version); ++ } ++ } ++ ++ Future<Void> scheduleSchemaPull(InetAddressAndPort endpoint, VersionInfo info) ++ { ++ FutureTask<Void> task = new FutureTask<>(() -> pullSchema(new Callback(endpoint, info)), null); ++ if (shouldPullImmediately(endpoint, info.version)) ++ { ++ Stage.MIGRATION.submit(task); ++ } ++ else ++ { ++ ScheduledExecutors.nonPeriodicTasks.schedule(() -> Stage.MIGRATION.submit(task), MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); ++ } ++ return task; ++ } ++ ++ @VisibleForTesting ++ protected void mergeSchemaFrom(InetAddressAndPort endpoint, Collection<Mutation> mutations) ++ { ++ Schema.instance.mergeAndAnnounceVersion(mutations); ++ } ++ ++ class Callback implements RequestCallback<Collection<Mutation>> ++ { ++ final InetAddressAndPort endpoint; ++ final VersionInfo info; ++ ++ public Callback(InetAddressAndPort endpoint, VersionInfo info) ++ { ++ this.endpoint = endpoint; ++ this.info = info; ++ } ++ ++ public void onFailure(InetAddressAndPort from, RequestFailureReason failureReason) ++ { ++ fail(); ++ } ++ ++ Future<Void> fail() ++ { ++ return pullComplete(endpoint, info, false); ++ } ++ ++ public void onResponse(Message<Collection<Mutation>> message) ++ { ++ response(message.payload); ++ } ++ ++ Future<Void> response(Collection<Mutation> mutations) ++ { ++ synchronized (info) ++ { ++ if (shouldApplySchemaFor(info)) ++ { ++ try ++ { ++ mergeSchemaFrom(endpoint, mutations); ++ } ++ catch (Exception e) ++ { ++ logger.error(String.format("Unable to merge schema from %s", endpoint), e); ++ return fail(); ++ } ++ } ++ return pullComplete(endpoint, info, true); ++ } ++ } ++ ++ public boolean isLatencyForSnitch() ++ { ++ return false; ++ } ++ } ++ ++ private void pullSchema(Callback callback) ++ { ++ if (!isAlive(callback.endpoint)) ++ { ++ logger.warn("Can't send schema pull request: node {} is down.", callback.endpoint); ++ callback.fail(); ++ return; ++ } ++ ++ // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(), ++ // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with ++ // a higher major. ++ if (!shouldPullFromEndpoint(callback.endpoint)) ++ { ++ logger.info("Skipped sending a migration request: node {} has a higher major version now.", callback.endpoint); ++ callback.fail(); ++ return; ++ } ++ ++ logger.debug("Requesting schema from {}", callback.endpoint); ++ sendMigrationMessage(callback); ++ } ++ ++ protected void sendMigrationMessage(Callback callback) ++ { ++ inflightTasks.getAndIncrement(); ++ Message message = Message.out(Verb.SCHEMA_PULL_REQ, NoPayload.noPayload); ++ logger.info("Sending schema pull request to {}", callback.endpoint); ++ MessagingService.instance().sendWithCallback(message, callback.endpoint, callback); ++ } ++ ++ private synchronized Future<Void> pullComplete(InetAddressAndPort endpoint, VersionInfo info, boolean wasSuccessful) ++ { ++ inflightTasks.decrementAndGet(); ++ if (wasSuccessful) ++ info.markReceived(); ++ ++ info.outstandingRequests.remove(endpoint); ++ info.requestQueue.add(endpoint); ++ return maybePullSchema(info); ++ } ++ ++ public int getInflightTasks() ++ { ++ return inflightTasks.get(); ++ } ++ ++ /** ++ * Wait until we've received schema responses for all versions we're aware of ++ * @param waitMillis ++ * @return true if response for all schemas were received, false if we timed out waiting ++ */ ++ public boolean awaitSchemaRequests(long waitMillis) ++ { ++ if (!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress())) ++ Gossiper.waitToSettle(); ++ ++ WaitQueue.Signal signal = null; ++ try ++ { ++ synchronized (this) ++ { ++ List<WaitQueue.Signal> signalList = new ArrayList<>(versionInfo.size()); ++ for (VersionInfo version : versionInfo.values()) ++ { ++ if (version.wasReceived()) ++ continue; ++ ++ signalList.add(version.register()); ++ } ++ ++ if (signalList.isEmpty()) ++ return true; ++ ++ WaitQueue.Signal[] signals = new WaitQueue.Signal[signalList.size()]; ++ signalList.toArray(signals); ++ signal = WaitQueue.all(signals); ++ } ++ ++ return signal.awaitUntil(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(waitMillis)); ++ } ++ catch (InterruptedException e) ++ { ++ throw new RuntimeException(e); ++ } ++ finally ++ { ++ if (signal != null) ++ signal.cancel(); ++ } ++ } ++} diff --cc src/java/org/apache/cassandra/schema/MigrationManager.java index 8f627cd,0000000..c2ec511 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@@ -1,501 -1,0 +1,360 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.lang.management.ManagementFactory; +import java.util.function.LongSupplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.db.*; +import org.apache.cassandra.exceptions.AlreadyExistsException; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.*; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.concurrent.Stage.MIGRATION; +import static org.apache.cassandra.net.Verb.SCHEMA_PUSH_REQ; + +public class MigrationManager +{ + private static final Logger logger = LoggerFactory.getLogger(MigrationManager.class); + + public static final MigrationManager instance = new MigrationManager(); + + private static LongSupplier getUptimeFn = () -> ManagementFactory.getRuntimeMXBean().getUptime(); + + @VisibleForTesting + public static void setUptimeFn(LongSupplier supplier) + { + getUptimeFn = supplier; + } + + private static final int MIGRATION_DELAY_IN_MS = 60000; + + private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds", "1")); + + private MigrationManager() {} + - public static void scheduleSchemaPull(InetAddressAndPort endpoint, EndpointState state) - { - UUID schemaVersion = state.getSchemaVersion(); - if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && schemaVersion != null) - maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value); - } - - /** - * If versions differ this node sends request with local migration list to the endpoint - * and expecting to receive a list of migrations to apply locally. - */ - private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddressAndPort endpoint, String releaseVersion) - { - String ourMajorVersion = FBUtilities.getReleaseVersionMajor(); - if (!releaseVersion.startsWith(ourMajorVersion)) - { - logger.debug("Not pulling schema because release version in Gossip is not major version {}, it is {}", ourMajorVersion, releaseVersion); - return; - } - if (Schema.instance.getVersion() == null) - { - logger.debug("Not pulling schema from {}, because local schema version is not known yet", - endpoint); - SchemaMigrationDiagnostics.unknownLocalSchemaVersion(endpoint, theirVersion); - return; - } - if (Schema.instance.isSameVersion(theirVersion)) - { - logger.debug("Not pulling schema from {}, because schema versions match ({})", - endpoint, - Schema.schemaVersionToString(theirVersion)); - SchemaMigrationDiagnostics.versionMatch(endpoint, theirVersion); - return; - } - if (!shouldPullSchemaFrom(endpoint)) - { - logger.debug("Not pulling schema from {}, because versions match ({}/{}), or shouldPullSchemaFrom returned false", - endpoint, Schema.instance.getVersion(), theirVersion); - SchemaMigrationDiagnostics.skipPull(endpoint, theirVersion); - return; - } - - if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < MIGRATION_DELAY_IN_MS) - { - // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately - logger.debug("Immediately submitting migration task for {}, " + - "schema versions: local={}, remote={}", - endpoint, - Schema.schemaVersionToString(Schema.instance.getVersion()), - Schema.schemaVersionToString(theirVersion)); - submitMigrationTask(endpoint); - } - else - { - // Include a delay to make sure we have a chance to apply any changes being - // pushed out simultaneously. See CASSANDRA-5025 - Runnable runnable = () -> - { - // grab the latest version of the schema since it may have changed again since the initial scheduling - UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint); - if (epSchemaVersion == null) - { - logger.debug("epState vanished for {}, not submitting migration task", endpoint); - return; - } - if (Schema.instance.isSameVersion(epSchemaVersion)) - { - logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion); - return; - } - logger.debug("Submitting migration task for {}, schema version mismatch: local={}, remote={}", - endpoint, - Schema.schemaVersionToString(Schema.instance.getVersion()), - Schema.schemaVersionToString(epSchemaVersion)); - submitMigrationTask(endpoint); - }; - ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); - } - } - - private static Future<?> submitMigrationTask(InetAddressAndPort endpoint) - { - /* - * Do not de-ref the future because that causes distributed deadlock (CASSANDRA-3832) because we are - * running in the gossip stage. - */ - return MIGRATION.submit(new MigrationTask(endpoint)); - } - - static boolean shouldPullSchemaFrom(InetAddressAndPort endpoint) - { - /* - * Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) - * Don't request schema from fat clients - */ - return MessagingService.instance().versions.knows(endpoint) - && MessagingService.instance().versions.getRaw(endpoint) == MessagingService.current_version - && !Gossiper.instance.isGossipOnlyMember(endpoint); - } - + private static boolean shouldPushSchemaTo(InetAddressAndPort endpoint) + { + // only push schema to nodes with known and equal versions + return !endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) + && MessagingService.instance().versions.knows(endpoint) + && MessagingService.instance().versions.getRaw(endpoint) == MessagingService.current_version; + } + - public static boolean isReadyForBootstrap() - { - return MigrationTask.getInflightTasks().isEmpty(); - } - - public static void waitUntilReadyForBootstrap() - { - CountDownLatch completionLatch; - while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null) - { - try - { - if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) - logger.error("Migration task failed to complete"); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - logger.error("Migration task was interrupted"); - } - } - } - + public static void announceNewKeyspace(KeyspaceMetadata ksm) throws ConfigurationException + { + announceNewKeyspace(ksm, false); + } + + public static void announceNewKeyspace(KeyspaceMetadata ksm, boolean announceLocally) throws ConfigurationException + { + announceNewKeyspace(ksm, FBUtilities.timestampMicros(), announceLocally); + } + + public static void announceNewKeyspace(KeyspaceMetadata ksm, long timestamp, boolean announceLocally) throws ConfigurationException + { + ksm.validate(); + + if (Schema.instance.getKeyspaceMetadata(ksm.name) != null) + throw new AlreadyExistsException(ksm.name); + + logger.info("Create new Keyspace: {}", ksm); + announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally); + } + + public static void announceNewTable(TableMetadata cfm) + { + announceNewTable(cfm, true, FBUtilities.timestampMicros()); + } + - /** - * Announces the table even if the definition is already know locally. - * This should generally be avoided but is used internally when we want to force the most up to date version of - * a system table schema (Note that we don't know if the schema we force _is_ the most recent version or not, we - * just rely on idempotency to basically ignore that announce if it's not. That's why we can't use announceTableUpdate - * it would for instance delete new columns if this is not called with the most up-to-date version) - * - * Note that this is only safe for system tables where we know the id is fixed and will be the same whatever version - * of the definition is used. - */ - public static void forceAnnounceNewTable(TableMetadata cfm) - { - announceNewTable(cfm, false, 0); - } - + private static void announceNewTable(TableMetadata cfm, boolean throwOnDuplicate, long timestamp) + { + cfm.validate(); + + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(cfm.keyspace); + if (ksm == null) + throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", cfm.name, cfm.keyspace)); + // If we have a table or a view which has the same name, we can't add a new one + else if (throwOnDuplicate && ksm.getTableOrViewNullable(cfm.name) != null) + throw new AlreadyExistsException(cfm.keyspace, cfm.name); + + logger.info("Create new table: {}", cfm); + announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), false); + } + + static void announceKeyspaceUpdate(KeyspaceMetadata ksm) + { + ksm.validate(); + + KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksm.name); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); + + logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, ksm); + announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, FBUtilities.timestampMicros()), false); + } + + public static void announceTableUpdate(TableMetadata tm) + { + announceTableUpdate(tm, false); + } + + public static void announceTableUpdate(TableMetadata updated, boolean announceLocally) + { + updated.validate(); + + TableMetadata current = Schema.instance.getTableMetadata(updated.keyspace, updated.name); + if (current == null) + throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace)); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(current.keyspace); + + updated.validateCompatibility(current); + + long timestamp = FBUtilities.timestampMicros(); + + logger.info("Update table '{}/{}' From {} To {}", current.keyspace, current.name, current, updated); + Mutation.SimpleBuilder builder = SchemaKeyspace.makeUpdateTableMutation(ksm, current, updated, timestamp); + + announce(builder, announceLocally); + } + + static void announceKeyspaceDrop(String ksName) + { + KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName); + if (oldKsm == null) + throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); + + logger.info("Drop Keyspace '{}'", oldKsm.name); + announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), false); + } + + public static void announceTableDrop(String ksName, String cfName, boolean announceLocally) + { + TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName); + if (tm == null) + throw new ConfigurationException(String.format("Cannot drop non existing table '%s' in keyspace '%s'.", cfName, ksName)); + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName); + + logger.info("Drop table '{}/{}'", tm.keyspace, tm.name); + announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, FBUtilities.timestampMicros()), announceLocally); + } + + /** + * actively announce a new version to active hosts via rpc + * @param schema The schema mutation to be applied + */ + private static void announce(Mutation.SimpleBuilder schema, boolean announceLocally) + { + List<Mutation> mutations = Collections.singletonList(schema.build()); + + if (announceLocally) + Schema.instance.merge(mutations); + else + announce(mutations); + } + + public static void announce(Mutation change) + { + announce(Collections.singleton(change)); + } + + public static void announce(Collection<Mutation> schema) + { + Future<?> f = MIGRATION.submit(() -> Schema.instance.mergeAndAnnounceVersion(schema)); + + Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>(); + Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>(); + Message<Collection<Mutation>> message = Message.out(SCHEMA_PUSH_REQ, schema); + for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers()) + { + if (shouldPushSchemaTo(endpoint)) + { + MessagingService.instance().send(message, endpoint); + schemaDestinationEndpoints.add(endpoint); + } + else + { + schemaEndpointsIgnored.add(endpoint); + } + } + + SchemaAnnouncementDiagnostics.schemaMutationsAnnounced(schemaDestinationEndpoints, schemaEndpointsIgnored); + FBUtilities.waitOnFuture(f); + } + + public static KeyspacesDiff announce(SchemaTransformation transformation, boolean locally) + { + long now = FBUtilities.timestampMicros(); + + Future<Schema.TransformationResult> future = + MIGRATION.submit(() -> Schema.instance.transform(transformation, locally, now)); + + Schema.TransformationResult result = Futures.getUnchecked(future); + if (!result.success) + throw result.exception; + + if (locally || result.diff.isEmpty()) + return result.diff; + + Set<InetAddressAndPort> schemaDestinationEndpoints = new HashSet<>(); + Set<InetAddressAndPort> schemaEndpointsIgnored = new HashSet<>(); + Message<Collection<Mutation>> message = Message.out(SCHEMA_PUSH_REQ, result.mutations); + for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers()) + { + if (shouldPushSchemaTo(endpoint)) + { + MessagingService.instance().send(message, endpoint); + schemaDestinationEndpoints.add(endpoint); + } + else + { + schemaEndpointsIgnored.add(endpoint); + } + } + + SchemaAnnouncementDiagnostics.schemaTransformationAnnounced(schemaDestinationEndpoints, schemaEndpointsIgnored, + transformation); + + return result.diff; + } + + /** + * Clear all locally stored schema information and reset schema to initial state. + * Called by user (via JMX) who wants to get rid of schema disagreement. + */ + public static void resetLocalSchema() + { + logger.info("Starting local schema reset..."); + + logger.debug("Truncating schema tables..."); + + SchemaMigrationDiagnostics.resetLocalSchema(); + + SchemaKeyspace.truncate(); + + logger.debug("Clearing local schema keyspace definitions..."); + + Schema.instance.clear(); + + Set<InetAddressAndPort> liveEndpoints = Gossiper.instance.getLiveMembers(); + liveEndpoints.remove(FBUtilities.getBroadcastAddressAndPort()); + + // force migration if there are nodes around + for (InetAddressAndPort node : liveEndpoints) + { - if (shouldPullSchemaFrom(node)) - { - logger.debug("Requesting schema from {}", node); - FBUtilities.waitOnFuture(submitMigrationTask(node)); - break; - } ++ EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(node); ++ Future<Void> pull = MigrationCoordinator.instance.reportEndpointVersion(node, state); ++ if (pull != null) ++ FBUtilities.waitOnFuture(pull); + } + + logger.info("Local schema reset is complete."); + } + + /** + * We have a set of non-local, distributed system keyspaces, e.g. system_traces, system_auth, etc. + * (see {@link SchemaConstants#REPLICATED_SYSTEM_KEYSPACE_NAMES}), that need to be created on cluster initialisation, + * and later evolved on major upgrades (sometimes minor too). This method compares the current known definitions + * of the tables (if the keyspace exists) to the expected, most modern ones expected by the running version of C*; + * if any changes have been detected, a schema Mutation will be created which, when applied, should make + * cluster's view of that keyspace aligned with the expected modern definition. + * + * @param keyspace the expected modern definition of the keyspace + * @param generation timestamp to use for the table changes in the schema mutation + * + * @return empty Optional if the current definition is up to date, or an Optional with the Mutation that would + * bring the schema in line with the expected definition. + */ + public static Optional<Mutation> evolveSystemKeyspace(KeyspaceMetadata keyspace, long generation) + { + Mutation.SimpleBuilder builder = null; + + KeyspaceMetadata definedKeyspace = Schema.instance.getKeyspaceMetadata(keyspace.name); + Tables definedTables = null == definedKeyspace ? Tables.none() : definedKeyspace.tables; + + for (TableMetadata table : keyspace.tables) + { + if (table.equals(definedTables.getNullable(table.name))) + continue; + + if (null == builder) + { + // for the keyspace definition itself (name, replication, durability) always use generation 0; + // this ensures that any changes made to replication by the user will never be overwritten. + builder = SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, 0); + + // now set the timestamp to generation, so the tables have the expected timestamp + builder.timestamp(generation); + } + + // for table definitions always use the provided generation; these tables, unlike their containing + // keyspaces, are *NOT* meant to be altered by the user; if their definitions need to change, + // the schema must be updated in code, and the appropriate generation must be bumped. + SchemaKeyspace.addTableToSchemaMutation(table, true, builder); + } + + return builder == null ? Optional.empty() : Optional.of(builder.build()); + } + + public static class MigrationsSerializer implements IVersionedSerializer<Collection<Mutation>> + { + public static MigrationsSerializer instance = new MigrationsSerializer(); + + public void serialize(Collection<Mutation> schema, DataOutputPlus out, int version) throws IOException + { + out.writeInt(schema.size()); + for (Mutation mutation : schema) + Mutation.serializer.serialize(mutation, out, version); + } + + public Collection<Mutation> deserialize(DataInputPlus in, int version) throws IOException + { + int count = in.readInt(); + Collection<Mutation> schema = new ArrayList<>(count); + + for (int i = 0; i < count; i++) + schema.add(Mutation.serializer.deserialize(in, version)); + + return schema; + } + + public long serializedSize(Collection<Mutation> schema, int version) + { + int size = TypeSizes.sizeof(schema.size()); + for (Mutation mutation : schema) + size += mutation.serializedSize(version); + return size; + } + } +} diff --cc src/java/org/apache/cassandra/schema/Schema.java index 9c0c590,0000000..c04c631 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/Schema.java +++ b/src/java/org/apache/cassandra/schema/Schema.java @@@ -1,890 -1,0 +1,890 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Sets; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.locator.LocalStrategy; +import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.Pair; +import org.cliffc.high_scale_lib.NonBlockingHashMap; + +import static java.lang.String.format; + +import static com.google.common.collect.Iterables.size; + +public final class Schema implements SchemaProvider +{ + public static final Schema instance = new Schema(); + + private volatile Keyspaces keyspaces = Keyspaces.none(); + + // UUID -> mutable metadata ref map. We have to update these in place every time a table changes. + private final Map<TableId, TableMetadataRef> metadataRefs = new NonBlockingHashMap<>(); + + // (keyspace name, index name) -> mutable metadata ref map. We have to update these in place every time an index changes. + private final Map<Pair<String, String>, TableMetadataRef> indexMetadataRefs = new NonBlockingHashMap<>(); + + // Keyspace objects, one per keyspace. Only one instance should ever exist for any given keyspace. + private final Map<String, Keyspace> keyspaceInstances = new NonBlockingHashMap<>(); + + private volatile UUID version; + + private final List<SchemaChangeListener> changeListeners = new CopyOnWriteArrayList<>(); + + /** + * Initialize empty schema object and load the hardcoded system tables + */ + private Schema() + { + if (DatabaseDescriptor.isDaemonInitialized() || DatabaseDescriptor.isToolInitialized()) + { + load(SchemaKeyspace.metadata()); + load(SystemKeyspace.metadata()); + } + } + + /** + * load keyspace (keyspace) definitions, but do not initialize the keyspace instances. + * Schema version may be updated as the result. + */ + public void loadFromDisk() + { + loadFromDisk(true); + } + + /** + * Load schema definitions from disk. + * + * @param updateVersion true if schema version needs to be updated + */ + public void loadFromDisk(boolean updateVersion) + { + SchemaDiagnostics.schemataLoading(this); + SchemaKeyspace.fetchNonSystemKeyspaces().forEach(this::load); + if (updateVersion) + updateVersion(); + SchemaDiagnostics.schemataLoaded(this); + } + + /** + * Update (or insert) new keyspace definition + * + * @param ksm The metadata about keyspace + */ + synchronized public void load(KeyspaceMetadata ksm) + { + KeyspaceMetadata previous = keyspaces.getNullable(ksm.name); + + if (previous == null) + loadNew(ksm); + else + reload(previous, ksm); + + keyspaces = keyspaces.withAddedOrUpdated(ksm); + } + + private void loadNew(KeyspaceMetadata ksm) + { + ksm.tablesAndViews() + .forEach(metadata -> metadataRefs.put(metadata.id, new TableMetadataRef(metadata))); + + ksm.tables + .indexTables() + .forEach((name, metadata) -> indexMetadataRefs.put(Pair.create(ksm.name, name), new TableMetadataRef(metadata))); + + SchemaDiagnostics.metadataInitialized(this, ksm); + } + + private void reload(KeyspaceMetadata previous, KeyspaceMetadata updated) + { + Keyspace keyspace = getKeyspaceInstance(updated.name); + if (null != keyspace) + keyspace.setMetadata(updated); + + Tables.TablesDiff tablesDiff = Tables.diff(previous.tables, updated.tables); + Views.ViewsDiff viewsDiff = Views.diff(previous.views, updated.views); + + MapDifference<String, TableMetadata> indexesDiff = previous.tables.indexesDiff(updated.tables); + + // clean up after removed entries + tablesDiff.dropped.forEach(table -> metadataRefs.remove(table.id)); + viewsDiff.dropped.forEach(view -> metadataRefs.remove(view.metadata.id)); + + indexesDiff.entriesOnlyOnLeft() + .values() + .forEach(indexTable -> indexMetadataRefs.remove(Pair.create(indexTable.keyspace, indexTable.indexName().get()))); + + // load up new entries + tablesDiff.created.forEach(table -> metadataRefs.put(table.id, new TableMetadataRef(table))); + viewsDiff.created.forEach(view -> metadataRefs.put(view.metadata.id, new TableMetadataRef(view.metadata))); + + indexesDiff.entriesOnlyOnRight() + .values() + .forEach(indexTable -> indexMetadataRefs.put(Pair.create(indexTable.keyspace, indexTable.indexName().get()), new TableMetadataRef(indexTable))); + + // refresh refs to updated ones + tablesDiff.altered.forEach(diff -> metadataRefs.get(diff.after.id).set(diff.after)); + viewsDiff.altered.forEach(diff -> metadataRefs.get(diff.after.metadata.id).set(diff.after.metadata)); + + indexesDiff.entriesDiffering() + .values() + .stream() + .map(MapDifference.ValueDifference::rightValue) + .forEach(indexTable -> indexMetadataRefs.get(Pair.create(indexTable.keyspace, indexTable.indexName().get())).set(indexTable)); + + SchemaDiagnostics.metadataReloaded(this, previous, updated, tablesDiff, viewsDiff, indexesDiff); + } + + public void registerListener(SchemaChangeListener listener) + { + changeListeners.add(listener); + } + + @SuppressWarnings("unused") + public void unregisterListener(SchemaChangeListener listener) + { + changeListeners.remove(listener); + } + + /** + * Get keyspace instance by name + * + * @param keyspaceName The name of the keyspace + * + * @return Keyspace object or null if keyspace was not found + */ + @Override + public Keyspace getKeyspaceInstance(String keyspaceName) + { + return keyspaceInstances.get(keyspaceName); + } + + public ColumnFamilyStore getColumnFamilyStoreInstance(TableId id) + { + TableMetadata metadata = getTableMetadata(id); + if (metadata == null) + return null; + + Keyspace instance = getKeyspaceInstance(metadata.keyspace); + if (instance == null) + return null; + + return instance.hasColumnFamilyStore(metadata.id) + ? instance.getColumnFamilyStore(metadata.id) + : null; + } + + /** + * Store given Keyspace instance to the schema + * + * @param keyspace The Keyspace instance to store + * + * @throws IllegalArgumentException if Keyspace is already stored + */ + @Override + public void storeKeyspaceInstance(Keyspace keyspace) + { + if (keyspaceInstances.putIfAbsent(keyspace.getName(), keyspace) != null) + throw new IllegalArgumentException(String.format("Keyspace %s was already initialized.", keyspace.getName())); + } + + /** + * Remove keyspace from schema + * + * @param keyspaceName The name of the keyspace to remove + * + * @return removed keyspace instance or null if it wasn't found + */ + public Keyspace removeKeyspaceInstance(String keyspaceName) + { + return keyspaceInstances.remove(keyspaceName); + } + + public Keyspaces snapshot() + { + return keyspaces; + } + + /** + * Remove keyspace definition from system + * + * @param ksm The keyspace definition to remove + */ + synchronized void unload(KeyspaceMetadata ksm) + { + keyspaces = keyspaces.without(ksm.name); + + ksm.tablesAndViews() + .forEach(t -> metadataRefs.remove(t.id)); + + ksm.tables + .indexTables() + .keySet() + .forEach(name -> indexMetadataRefs.remove(Pair.create(ksm.name, name))); + + SchemaDiagnostics.metadataRemoved(this, ksm); + } + + public int getNumberOfTables() + { + return keyspaces.stream().mapToInt(k -> size(k.tablesAndViews())).sum(); + } + + public ViewMetadata getView(String keyspaceName, String viewName) + { + assert keyspaceName != null; + KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName); + return (ksm == null) ? null : ksm.views.getNullable(viewName); + } + + /** + * Get metadata about keyspace by its name + * + * @param keyspaceName The name of the keyspace + * + * @return The keyspace metadata or null if it wasn't found + */ + @Override + public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName) + { + assert keyspaceName != null; + KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName); + return null != keyspace ? keyspace : VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName); + } + + private Set<String> getNonSystemKeyspacesSet() + { + return Sets.difference(keyspaces.names(), SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES); + } + + /** + * @return collection of the non-system keyspaces (note that this count as system only the + * non replicated keyspaces, so keyspace like system_traces which are replicated are actually + * returned. See getUserKeyspace() below if you don't want those) + */ + public ImmutableList<String> getNonSystemKeyspaces() + { + return ImmutableList.copyOf(getNonSystemKeyspacesSet()); + } + + /** + * @return a collection of keyspaces that do not use LocalStrategy for replication + */ + public List<String> getNonLocalStrategyKeyspaces() + { + return keyspaces.stream() + .filter(keyspace -> keyspace.params.replication.klass != LocalStrategy.class) + .map(keyspace -> keyspace.name) + .collect(Collectors.toList()); + } + + /** + * @return collection of the user defined keyspaces + */ + public List<String> getUserKeyspaces() + { + return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES)); + } + + /** + * Get metadata about keyspace inner ColumnFamilies + * + * @param keyspaceName The name of the keyspace + * + * @return metadata about ColumnFamilies the belong to the given keyspace + */ + public Iterable<TableMetadata> getTablesAndViews(String keyspaceName) + { + assert keyspaceName != null; + KeyspaceMetadata ksm = keyspaces.getNullable(keyspaceName); + assert ksm != null; + return ksm.tablesAndViews(); + } + + /** + * @return collection of the all keyspace names registered in the system (system and non-system) + */ + public Set<String> getKeyspaces() + { + return keyspaces.names(); + } + + /* TableMetadata/Ref query/control methods */ + + /** + * Given a keyspace name and table/view name, get the table metadata + * reference. If the keyspace name or table/view name is not present + * this method returns null. + * + * @return TableMetadataRef object or null if it wasn't found + */ + @Override + public TableMetadataRef getTableMetadataRef(String keyspace, String table) + { + TableMetadata tm = getTableMetadata(keyspace, table); + return tm == null + ? null + : metadataRefs.get(tm.id); + } + + public TableMetadataRef getIndexTableMetadataRef(String keyspace, String index) + { + return indexMetadataRefs.get(Pair.create(keyspace, index)); + } + + Map<Pair<String, String>, TableMetadataRef> getIndexTableMetadataRefs() + { + return indexMetadataRefs; + } + + /** + * Get Table metadata by its identifier + * + * @param id table or view identifier + * + * @return metadata about Table or View + */ + @Override + public TableMetadataRef getTableMetadataRef(TableId id) + { + return metadataRefs.get(id); + } + + @Override + public TableMetadataRef getTableMetadataRef(Descriptor descriptor) + { + return getTableMetadataRef(descriptor.ksname, descriptor.cfname); + } + + Map<TableId, TableMetadataRef> getTableMetadataRefs() + { + return metadataRefs; + } + + /** + * Given a keyspace name and table name, get the table + * meta data. If the keyspace name or table name is not valid + * this function returns null. + * + * @param keyspace The keyspace name + * @param table The table name + * + * @return TableMetadata object or null if it wasn't found + */ + public TableMetadata getTableMetadata(String keyspace, String table) + { + assert keyspace != null; + assert table != null; + + KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace); + return ksm == null + ? null + : ksm.getTableOrViewNullable(table); + } + + @Override + public TableMetadata getTableMetadata(TableId id) + { + TableMetadata table = keyspaces.getTableOrViewNullable(id); + return null != table ? table : VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id); + } + + public TableMetadata validateTable(String keyspaceName, String tableName) + { + if (tableName.isEmpty()) + throw new InvalidRequestException("non-empty table is required"); + + KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName); + if (keyspace == null) + throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName)); + + TableMetadata metadata = keyspace.getTableOrViewNullable(tableName); + if (metadata == null) + throw new InvalidRequestException(format("table %s does not exist", tableName)); + + return metadata; + } + + public TableMetadata getTableMetadata(Descriptor descriptor) + { + return getTableMetadata(descriptor.ksname, descriptor.cfname); + } + + /* Function helpers */ + + /** + * Get all function overloads with the specified name + * + * @param name fully qualified function name + * @return an empty list if the keyspace or the function name are not found; + * a non-empty collection of {@link Function} otherwise + */ + public Collection<Function> getFunctions(FunctionName name) + { + if (!name.hasKeyspace()) + throw new IllegalArgumentException(String.format("Function name must be fully qualified: got %s", name)); + + KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace); + return ksm == null + ? Collections.emptyList() + : ksm.functions.get(name); + } + + /** + * Find the function with the specified name + * + * @param name fully qualified function name + * @param argTypes function argument types + * @return an empty {@link Optional} if the keyspace or the function name are not found; + * a non-empty optional of {@link Function} otherwise + */ + public Optional<Function> findFunction(FunctionName name, List<AbstractType<?>> argTypes) + { + if (!name.hasKeyspace()) + throw new IllegalArgumentException(String.format("Function name must be fully quallified: got %s", name)); + + KeyspaceMetadata ksm = getKeyspaceMetadata(name.keyspace); + return ksm == null + ? Optional.empty() + : ksm.functions.find(name, argTypes); + } + + /* Version control */ + + /** + * @return current schema version + */ + public UUID getVersion() + { + return version; + } + + /** + * Checks whether the given schema version is the same as the current local schema. + */ + public boolean isSameVersion(UUID schemaVersion) + { + return schemaVersion != null && schemaVersion.equals(version); + } + + /** + * Checks whether the current schema is empty. + */ + public boolean isEmpty() + { + return SchemaConstants.emptyVersion.equals(version); + } + + /** + * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest + * will be converted into UUID which would act as content-based version of the schema. + */ + public void updateVersion() + { + version = SchemaKeyspace.calculateSchemaDigest(); + SystemKeyspace.updateSchemaVersion(version); + SchemaDiagnostics.versionUpdated(this); + } + + /* + * Like updateVersion, but also announces via gossip + */ + public void updateVersionAndAnnounce() + { + updateVersion(); + passiveAnnounceVersion(); + } + + /** + * Announce my version passively over gossip. + * Used to notify nodes as they arrive in the cluster. + */ + private void passiveAnnounceVersion() + { + Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); + SchemaDiagnostics.versionAnnounced(this); + } + + /** + * Clear all KS/CF metadata and reset version. + */ + public synchronized void clear() + { + getNonSystemKeyspaces().forEach(k -> unload(getKeyspaceMetadata(k))); + updateVersionAndAnnounce(); + SchemaDiagnostics.schemataCleared(this); + } + + /* + * Reload schema from local disk. Useful if a user made changes to schema tables by hand, or has suspicion that + * in-memory representation got out of sync somehow with what's on disk. + */ + public synchronized void reloadSchemaAndAnnounceVersion() + { + Keyspaces before = keyspaces.filter(k -> !SchemaConstants.isLocalSystemKeyspace(k.name)); + Keyspaces after = SchemaKeyspace.fetchNonSystemKeyspaces(); + merge(Keyspaces.diff(before, after)); + updateVersionAndAnnounce(); + } + + /** + * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects + * (which also involves fs operations on add/drop ks/cf) + * + * @param mutations the schema changes to apply + * + * @throws ConfigurationException If one of metadata attributes has invalid value + */ - synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations) ++ public synchronized void mergeAndAnnounceVersion(Collection<Mutation> mutations) + { + merge(mutations); + updateVersionAndAnnounce(); + } + + public synchronized TransformationResult transform(SchemaTransformation transformation, boolean locally, long now) + { + KeyspacesDiff diff; + try + { + Keyspaces before = keyspaces; + Keyspaces after = transformation.apply(before); + diff = Keyspaces.diff(before, after); + } + catch (RuntimeException e) + { + return new TransformationResult(e); + } + + if (diff.isEmpty()) + return new TransformationResult(diff, Collections.emptyList()); + + Collection<Mutation> mutations = SchemaKeyspace.convertSchemaDiffToMutations(diff, now); + SchemaKeyspace.applyChanges(mutations); + + merge(diff); + updateVersion(); + if (!locally) + passiveAnnounceVersion(); + + return new TransformationResult(diff, mutations); + } + + public static final class TransformationResult + { + public final boolean success; + public final RuntimeException exception; + public final KeyspacesDiff diff; + public final Collection<Mutation> mutations; + + private TransformationResult(boolean success, RuntimeException exception, KeyspacesDiff diff, Collection<Mutation> mutations) + { + this.success = success; + this.exception = exception; + this.diff = diff; + this.mutations = mutations; + } + + TransformationResult(RuntimeException exception) + { + this(false, exception, null, null); + } + + TransformationResult(KeyspacesDiff diff, Collection<Mutation> mutations) + { + this(true, null, diff, mutations); + } + } + + synchronized void merge(Collection<Mutation> mutations) + { + // only compare the keyspaces affected by this set of schema mutations + Set<String> affectedKeyspaces = SchemaKeyspace.affectedKeyspaces(mutations); + + // fetch the current state of schema for the affected keyspaces only + Keyspaces before = keyspaces.filter(k -> affectedKeyspaces.contains(k.name)); + + // apply the schema mutations + SchemaKeyspace.applyChanges(mutations); + + // apply the schema mutations and fetch the new versions of the altered keyspaces + Keyspaces after = SchemaKeyspace.fetchKeyspaces(affectedKeyspaces); + + merge(Keyspaces.diff(before, after)); + } + + private void merge(KeyspacesDiff diff) + { + diff.dropped.forEach(this::dropKeyspace); + diff.created.forEach(this::createKeyspace); + diff.altered.forEach(this::alterKeyspace); + } + + private void alterKeyspace(KeyspaceDiff delta) + { + SchemaDiagnostics.keyspaceAltering(this, delta); + + // drop tables and views + delta.views.dropped.forEach(this::dropView); + delta.tables.dropped.forEach(this::dropTable); + + load(delta.after); + + // add tables and views + delta.tables.created.forEach(this::createTable); + delta.views.created.forEach(this::createView); + + // update tables and views + delta.tables.altered.forEach(diff -> alterTable(diff.after)); + delta.views.altered.forEach(diff -> alterView(diff.after)); + + // deal with all added, and altered views + Keyspace.open(delta.after.name).viewManager.reload(true); + + // notify on everything dropped + delta.udas.dropped.forEach(uda -> notifyDropAggregate((UDAggregate) uda)); + delta.udfs.dropped.forEach(udf -> notifyDropFunction((UDFunction) udf)); + delta.views.dropped.forEach(this::notifyDropView); + delta.tables.dropped.forEach(this::notifyDropTable); + delta.types.dropped.forEach(this::notifyDropType); + + // notify on everything created + delta.types.created.forEach(this::notifyCreateType); + delta.tables.created.forEach(this::notifyCreateTable); + delta.views.created.forEach(this::notifyCreateView); + delta.udfs.created.forEach(udf -> notifyCreateFunction((UDFunction) udf)); + delta.udas.created.forEach(uda -> notifyCreateAggregate((UDAggregate) uda)); + + // notify on everything altered + if (!delta.before.params.equals(delta.after.params)) + notifyAlterKeyspace(delta.before, delta.after); + delta.types.altered.forEach(diff -> notifyAlterType(diff.before, diff.after)); + delta.tables.altered.forEach(diff -> notifyAlterTable(diff.before, diff.after)); + delta.views.altered.forEach(diff -> notifyAlterView(diff.before, diff.after)); + delta.udfs.altered.forEach(diff -> notifyAlterFunction(diff.before, diff.after)); + delta.udas.altered.forEach(diff -> notifyAlterAggregate(diff.before, diff.after)); + SchemaDiagnostics.keyspaceAltered(this, delta); + } + + private void createKeyspace(KeyspaceMetadata keyspace) + { + SchemaDiagnostics.keyspaceCreating(this, keyspace); + load(keyspace); + Keyspace.open(keyspace.name); + + notifyCreateKeyspace(keyspace); + keyspace.types.forEach(this::notifyCreateType); + keyspace.tables.forEach(this::notifyCreateTable); + keyspace.views.forEach(this::notifyCreateView); + keyspace.functions.udfs().forEach(this::notifyCreateFunction); + keyspace.functions.udas().forEach(this::notifyCreateAggregate); + SchemaDiagnostics.keyspaceCreated(this, keyspace); + } + + private void dropKeyspace(KeyspaceMetadata keyspace) + { + SchemaDiagnostics.keyspaceDroping(this, keyspace); + keyspace.views.forEach(this::dropView); + keyspace.tables.forEach(this::dropTable); + + // remove the keyspace from the static instances. + Keyspace.clear(keyspace.name); + unload(keyspace); + Keyspace.writeOrder.awaitNewBarrier(); + + keyspace.functions.udas().forEach(this::notifyDropAggregate); + keyspace.functions.udfs().forEach(this::notifyDropFunction); + keyspace.views.forEach(this::notifyDropView); + keyspace.tables.forEach(this::notifyDropTable); + keyspace.types.forEach(this::notifyDropType); + notifyDropKeyspace(keyspace); + SchemaDiagnostics.keyspaceDroped(this, keyspace); + } + + private void dropView(ViewMetadata metadata) + { + Keyspace.open(metadata.keyspace()).viewManager.dropView(metadata.name()); + dropTable(metadata.metadata); + } + + private void dropTable(TableMetadata metadata) + { + SchemaDiagnostics.tableDropping(this, metadata); + ColumnFamilyStore cfs = Keyspace.open(metadata.keyspace).getColumnFamilyStore(metadata.name); + assert cfs != null; + // make sure all the indexes are dropped, or else. + cfs.indexManager.markAllIndexesRemoved(); + CompactionManager.instance.interruptCompactionFor(Collections.singleton(metadata), (sstable) -> true, true); + if (DatabaseDescriptor.isAutoSnapshot()) + cfs.snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(cfs.name, ColumnFamilyStore.SNAPSHOT_DROP_PREFIX)); + CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id)); + Keyspace.open(metadata.keyspace).dropCf(metadata.id); + SchemaDiagnostics.tableDropped(this, metadata); + } + + private void createTable(TableMetadata table) + { + SchemaDiagnostics.tableCreating(this, table); + Keyspace.open(table.keyspace).initCf(metadataRefs.get(table.id), true); + SchemaDiagnostics.tableCreated(this, table); + } + + private void createView(ViewMetadata view) + { + Keyspace.open(view.keyspace()).initCf(metadataRefs.get(view.metadata.id), true); + } + + private void alterTable(TableMetadata updated) + { + SchemaDiagnostics.tableAltering(this, updated); + Keyspace.open(updated.keyspace).getColumnFamilyStore(updated.name).reload(); + SchemaDiagnostics.tableAltered(this, updated); + } + + private void alterView(ViewMetadata updated) + { + Keyspace.open(updated.keyspace()).getColumnFamilyStore(updated.name()).reload(); + } + + private void notifyCreateKeyspace(KeyspaceMetadata ksm) + { + changeListeners.forEach(l -> l.onCreateKeyspace(ksm.name)); + } + + private void notifyCreateTable(TableMetadata metadata) + { + changeListeners.forEach(l -> l.onCreateTable(metadata.keyspace, metadata.name)); + } + + private void notifyCreateView(ViewMetadata view) + { + changeListeners.forEach(l -> l.onCreateView(view.keyspace(), view.name())); + } + + private void notifyCreateType(UserType ut) + { + changeListeners.forEach(l -> l.onCreateType(ut.keyspace, ut.getNameAsString())); + } + + private void notifyCreateFunction(UDFunction udf) + { + changeListeners.forEach(l -> l.onCreateFunction(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyCreateAggregate(UDAggregate udf) + { + changeListeners.forEach(l -> l.onCreateAggregate(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyAlterKeyspace(KeyspaceMetadata before, KeyspaceMetadata after) + { + changeListeners.forEach(l -> l.onAlterKeyspace(after.name)); + } + + private void notifyAlterTable(TableMetadata before, TableMetadata after) + { + boolean changeAffectedPreparedStatements = before.changeAffectsPreparedStatements(after); + changeListeners.forEach(l -> l.onAlterTable(after.keyspace, after.name, changeAffectedPreparedStatements)); + } + + private void notifyAlterView(ViewMetadata before, ViewMetadata after) + { + boolean changeAffectedPreparedStatements = before.metadata.changeAffectsPreparedStatements(after.metadata); + changeListeners.forEach(l ->l.onAlterView(after.keyspace(), after.name(), changeAffectedPreparedStatements)); + } + + private void notifyAlterType(UserType before, UserType after) + { + changeListeners.forEach(l -> l.onAlterType(after.keyspace, after.getNameAsString())); + } + + private void notifyAlterFunction(UDFunction before, UDFunction after) + { + changeListeners.forEach(l -> l.onAlterFunction(after.name().keyspace, after.name().name, after.argTypes())); + } + + private void notifyAlterAggregate(UDAggregate before, UDAggregate after) + { + changeListeners.forEach(l -> l.onAlterAggregate(after.name().keyspace, after.name().name, after.argTypes())); + } + + private void notifyDropKeyspace(KeyspaceMetadata ksm) + { + changeListeners.forEach(l -> l.onDropKeyspace(ksm.name)); + } + + private void notifyDropTable(TableMetadata metadata) + { + changeListeners.forEach(l -> l.onDropTable(metadata.keyspace, metadata.name)); + } + + private void notifyDropView(ViewMetadata view) + { + changeListeners.forEach(l -> l.onDropView(view.keyspace(), view.name())); + } + + private void notifyDropType(UserType ut) + { + changeListeners.forEach(l -> l.onDropType(ut.keyspace, ut.getNameAsString())); + } + + private void notifyDropFunction(UDFunction udf) + { + changeListeners.forEach(l -> l.onDropFunction(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + private void notifyDropAggregate(UDAggregate udf) + { + changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes())); + } + + + /** + * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null} + * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema. + */ + public static String schemaVersionToString(UUID version) + { + return version == null + ? "unknown" + : SchemaConstants.emptyVersion.equals(version) + ? "(empty)" + : version.toString(); + } +} diff --cc src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java index 0e6b7a3,0000000..a984804 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java +++ b/src/java/org/apache/cassandra/schema/SchemaMigrationEvent.java @@@ -1,114 -1,0 +1,111 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import javax.annotation.Nullable; + +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.diag.DiagnosticEvent; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; + +/** + * Internal events emitted by {@link MigrationManager}. + */ +final class SchemaMigrationEvent extends DiagnosticEvent +{ + private final MigrationManagerEventType type; + @Nullable + private final InetAddressAndPort endpoint; + @Nullable + private final UUID endpointSchemaVersion; + private final UUID localSchemaVersion; + private final Integer localMessagingVersion; + private final SystemKeyspace.BootstrapState bootstrapState; - @Nullable - private Integer inflightTaskCount; ++ private final Integer inflightTaskCount; + @Nullable + private Integer endpointMessagingVersion; + @Nullable + private Boolean endpointGossipOnlyMember; + @Nullable + private Boolean isAlive; + + enum MigrationManagerEventType + { + UNKNOWN_LOCAL_SCHEMA_VERSION, + VERSION_MATCH, + SKIP_PULL, + RESET_LOCAL_SCHEMA, + TASK_CREATED, + TASK_SEND_ABORTED, + TASK_REQUEST_SEND + } + + SchemaMigrationEvent(MigrationManagerEventType type, + @Nullable InetAddressAndPort endpoint, @Nullable UUID endpointSchemaVersion) + { + this.type = type; + this.endpoint = endpoint; + this.endpointSchemaVersion = endpointSchemaVersion; + + localSchemaVersion = Schema.instance.getVersion(); + localMessagingVersion = MessagingService.current_version; + - Queue<CountDownLatch> inflightTasks = MigrationTask.getInflightTasks(); - if (inflightTasks != null) - inflightTaskCount = inflightTasks.size(); ++ inflightTaskCount = MigrationCoordinator.instance.getInflightTasks(); + + this.bootstrapState = SystemKeyspace.getBootstrapState(); + + if (endpoint == null) return; + + if (MessagingService.instance().versions.knows(endpoint)) + endpointMessagingVersion = MessagingService.instance().versions.getRaw(endpoint); + + endpointGossipOnlyMember = Gossiper.instance.isGossipOnlyMember(endpoint); + this.isAlive = FailureDetector.instance.isAlive(endpoint); + } + + public Enum<?> getType() + { + return type; + } + + public Map<String, Serializable> toMap() + { + HashMap<String, Serializable> ret = new HashMap<>(); + if (endpoint != null) ret.put("endpoint", endpoint.getHostAddressAndPort()); + ret.put("endpointSchemaVersion", Schema.schemaVersionToString(endpointSchemaVersion)); + ret.put("localSchemaVersion", Schema.schemaVersionToString(localSchemaVersion)); + if (endpointMessagingVersion != null) ret.put("endpointMessagingVersion", endpointMessagingVersion); + if (localMessagingVersion != null) ret.put("localMessagingVersion", localMessagingVersion); + if (endpointGossipOnlyMember != null) ret.put("endpointGossipOnlyMember", endpointGossipOnlyMember); + if (isAlive != null) ret.put("endpointIsAlive", isAlive); + if (bootstrapState != null) ret.put("bootstrapState", bootstrapState.name()); + if (inflightTaskCount != null) ret.put("inflightTaskCount", inflightTaskCount); + return ret; + } +} diff --cc src/java/org/apache/cassandra/service/StorageService.java index ca2bff2,734f176..4a3477c --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -30,13 -29,10 +30,14 @@@ import java.util.concurrent.atomic.Atom import java.util.regex.MatchResult; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; ++import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.management.*; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; @@@ -91,17 -82,16 +92,18 @@@ import org.apache.cassandra.repair.* import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.service.paxos.CommitVerbHandler; -import org.apache.cassandra.service.paxos.PrepareVerbHandler; -import org.apache.cassandra.service.paxos.ProposeVerbHandler; ++import org.apache.cassandra.schema.MigrationCoordinator; +import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.streaming.*; -import org.apache.cassandra.thrift.EndpointDetails; -import org.apache.cassandra.thrift.TokenRange; -import org.apache.cassandra.thrift.cassandraConstants; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.progress.ProgressEvent; @@@ -135,8 -116,10 +137,11 @@@ public class StorageService extends Not { private static final Logger logger = LoggerFactory.getLogger(StorageService.class); + public static final int INDEFINITE = -1; public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized + public static final int SCHEMA_DELAY = getRingDelay(); // delay after which we assume ring has stablized + + private static final boolean REQUIRE_SCHEMAS = !Boolean.getBoolean("cassandra.skip_schema_check"); private final JMXProgressSupport progressSupport = new JMXProgressSupport(this); @@@ -149,11 -141,23 +154,25 @@@ return Integer.parseInt(newdelay); } else + { return 30 * 1000; + } } + private static int getSchemaDelay() + { + String newdelay = System.getProperty("cassandra.schema_delay_ms"); + if (newdelay != null) + { + logger.info("Overriding SCHEMA_DELAY to {}ms", newdelay); + return Integer.parseInt(newdelay); + } + else + { + return 30 * 1000; + } + } + /* This abstraction maintains the token/endpoint metadata information */ private TokenMetadata tokenMetadata = new TokenMetadata(); @@@ -774,11 -824,13 +793,12 @@@ public static boolean isSeed() { - return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()); + return DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()); } - @VisibleForTesting - public void prepareToJoin() throws ConfigurationException + private void prepareToJoin() throws ConfigurationException { + MigrationCoordinator.instance.start(); if (!joined) { Map<ApplicationState, VersionedValue> appStates = new EnumMap<>(ApplicationState.class); @@@ -880,26 -932,23 +900,31 @@@ } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } - // if our schema hasn't matched yet, wait until it has - // we do this by waiting for all in-flight migration requests and responses to complete - // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) - if (!MigrationManager.isReadyForBootstrap()) - { - setMode(Mode.JOINING, "waiting for schema information to complete", true); - MigrationManager.waitUntilReadyForBootstrap(); - } + + boolean schemasReceived = MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(SCHEMA_DELAY)); + + if (schemasReceived) + return; + + logger.warn(String.format("There are nodes in the cluster with a different schema version than us we did not merged schemas from, " + + "our version : (%s), outstanding versions -> endpoints : %s", + Schema.instance.getVersion(), + MigrationCoordinator.instance.outstandingVersions())); + + if (REQUIRE_SCHEMAS) + throw new RuntimeException("Didn't receive schemas for all known versions within the timeout"); } + private void joinTokenRing(long schemaTimeoutMillis) throws ConfigurationException + { + joinTokenRing(!isSurveyMode, shouldBootstrap(), schemaTimeoutMillis, INDEFINITE); + } + @VisibleForTesting - private void joinTokenRing(int delay) throws ConfigurationException + public void joinTokenRing(boolean finishJoiningRing, + boolean shouldBootstrap, + long schemaTimeoutMillis, + long bootstrapTimeoutMillis) throws ConfigurationException { joined = true; @@@ -2233,11 -2104,11 +2258,11 @@@ } break; case SCHEMA: - SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value), executor); + SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value)); - MigrationManager.instance.scheduleSchemaPull(endpoint, epState); + MigrationCoordinator.instance.reportEndpointVersion(endpoint, UUID.fromString(value.value)); break; case HOST_ID: - SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value), executor); + SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value)); break; case RPC_READY: notifyRpcChange(endpoint, epState.isRpcReady()); @@@ -3126,13 -2866,11 +3151,11 @@@ { onChange(endpoint, entry.getKey(), entry.getValue()); } - MigrationManager.instance.scheduleSchemaPull(endpoint, epState); + MigrationCoordinator.instance.reportEndpointVersion(endpoint, epState); } - public void onAlive(InetAddress endpoint, EndpointState state) + public void onAlive(InetAddressAndPort endpoint, EndpointState state) { - MigrationManager.instance.scheduleSchemaPull(endpoint, state); - if (tokenMetadata.isMember(endpoint)) notifyUp(endpoint); } @@@ -5683,42 -5381,9 +5706,58 @@@ } @Override + public void enableFullQueryLogger(String path, String rollCycle, Boolean blocking, int maxQueueWeight, long maxLogSize, String archiveCommand, int maxArchiveRetries) + { + FullQueryLoggerOptions fqlOptions = DatabaseDescriptor.getFullQueryLogOptions(); + path = path != null ? path : fqlOptions.log_dir; + rollCycle = rollCycle != null ? rollCycle : fqlOptions.roll_cycle; + blocking = blocking != null ? blocking : fqlOptions.block; + maxQueueWeight = maxQueueWeight != Integer.MIN_VALUE ? maxQueueWeight : fqlOptions.max_queue_weight; + maxLogSize = maxLogSize != Long.MIN_VALUE ? maxLogSize : fqlOptions.max_log_size; + archiveCommand = archiveCommand != null ? archiveCommand : fqlOptions.archive_command; + maxArchiveRetries = maxArchiveRetries != Integer.MIN_VALUE ? maxArchiveRetries : fqlOptions.max_archive_retries; + + Preconditions.checkNotNull(path, "cassandra.yaml did not set log_dir and not set as parameter"); + FullQueryLogger.instance.enable(Paths.get(path), rollCycle, blocking, maxQueueWeight, maxLogSize, archiveCommand, maxArchiveRetries); + } + + @Override + public void resetFullQueryLogger() + { + FullQueryLogger.instance.reset(DatabaseDescriptor.getFullQueryLogOptions().log_dir); + } + + @Override + public void stopFullQueryLogger() + { + FullQueryLogger.instance.stop(); + } + + @Override + public boolean isFullQueryLogEnabled() + { + return FullQueryLogger.instance.isEnabled(); + } + + @Override + public CompositeData getFullQueryLoggerOptions() + { + return FullQueryLoggerOptionsCompositeData.toCompositeData(FullQueryLogger.instance.getFullQueryLoggerOptions()); + } ++ ++ @Override + public Map<String, Set<InetAddress>> getOutstandingSchemaVersions() + { - Map<UUID, Set<InetAddress>> outstanding = MigrationCoordinator.instance.outstandingVersions(); - return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), Entry::getValue)); ++ Map<UUID, Set<InetAddressAndPort>> outstanding = MigrationCoordinator.instance.outstandingVersions(); ++ return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), ++ e -> e.getValue().stream().map(i -> i.address).collect(Collectors.toSet()))); ++ } ++ ++ @Override ++ public Map<String, Set<String>> getOutstandingSchemaVersionsWithPort() ++ { ++ Map<UUID, Set<InetAddressAndPort>> outstanding = MigrationCoordinator.instance.outstandingVersions(); ++ return outstanding.entrySet().stream().collect(Collectors.toMap(e -> e.getKey().toString(), ++ e -> e.getValue().stream().map(InetAddressAndPort::toString).collect(Collectors.toSet()))); + } } diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index 618a37e,218cdd9..0c4b5b0 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -22,19 -21,16 +22,21 @@@ import java.io.Serializable import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; -import java.util.Collection; import java.util.List; import java.util.Map; + import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; - +import javax.annotation.Nullable; import javax.management.NotificationEmitter; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import org.apache.cassandra.db.ColumnFamilyStoreMBean; +import org.apache.cassandra.exceptions.ConfigurationException; ++import org.apache.cassandra.locator.InetAddressAndPort; + public interface StorageServiceMBean extends NotificationEmitter { /** @@@ -750,73 -707,9 +752,78 @@@ */ public boolean resumeBootstrap(); - /** Returns the max version that this node will negotiate for native protocol connections */ - public int getMaxNativeProtocolVersion(); + /** Gets the concurrency settings for processing stages*/ + static class StageConcurrency implements Serializable + { + public final int corePoolSize; + public final int maximumPoolSize; + + public StageConcurrency(int corePoolSize, int maximumPoolSize) + { + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + } + + } + public Map<String, List<Integer>> getConcurrency(List<String> stageNames); + + /** Sets the concurrency setting for processing stages */ + public void setConcurrency(String threadPoolName, int newCorePoolSize, int newMaximumPoolSize); + + /** Clears the history of clients that have connected in the past **/ + void clearConnectionHistory(); + public void disableAuditLog(); + public void enableAuditLog(String loggerName, Map<String, String> parameters, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, String includedUsers, String excludedUsers) throws ConfigurationException; + public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, String includedUsers, String excludedUsers) throws ConfigurationException; + public boolean isAuditLogEnabled(); + public String getCorruptedTombstoneStrategy(); + public void setCorruptedTombstoneStrategy(String strategy); + + /** + * Start the fully query logger. + * @param path Path where the full query log will be stored. If null cassandra.yaml value is used. + * @param rollCycle How often to create a new file for query data (MINUTELY, DAILY, HOURLY) + * @param blocking Whether threads submitting queries to the query log should block if they can't be drained to the filesystem or alternatively drops samples and log + * @param maxQueueWeight How many bytes of query data to queue before blocking or dropping samples + * @param maxLogSize How many bytes of log data to store before dropping segments. Might not be respected if a log file hasn't rolled so it can be deleted. + * @param archiveCommand executable archiving the rolled log files, + * @param maxArchiveRetries max number of times to retry a failing archive command + * + */ + public void enableFullQueryLogger(String path, String rollCycle, Boolean blocking, int maxQueueWeight, long maxLogSize, @Nullable String archiveCommand, int maxArchiveRetries); + + /** + * Disable the full query logger if it is enabled. + * Also delete any generated files in the last used full query log path as well as the one configure in cassandra.yaml + */ + public void resetFullQueryLogger(); + + /** + * Stop logging queries but leave any generated files on disk. + */ + public void stopFullQueryLogger(); + + public boolean isFullQueryLogEnabled(); + + /** + * Returns the current state of FQL. + */ + CompositeData getFullQueryLoggerOptions(); + + /** Sets the initial allocation size of backing arrays for new RangeTombstoneList objects */ + public void setInitialRangeTombstoneListAllocationSize(int size); + + /** Returns the initial allocation size of backing arrays for new RangeTombstoneList objects */ + public int getInitialRangeTombstoneListAllocationSize(); + + /** Sets the resize factor to use when growing/resizing a RangeTombstoneList */ + public void setRangeTombstoneListResizeGrowthFactor(double growthFactor); + + /** Returns the resize factor to use when growing/resizing a RangeTombstoneList */ + public double getRangeTombstoneResizeListGrowthFactor(); + + /** Returns a map of schema version -> list of endpoints reporting that version that we need schema updates for */ ++ @Deprecated + public Map<String, Set<InetAddress>> getOutstandingSchemaVersions(); ++ public Map<String, Set<String>> getOutstandingSchemaVersionsWithPort(); } diff --cc test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java index 70ef503,0000000..6dda98e mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java @@@ -1,457 -1,0 +1,461 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.action; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.shared.VersionedApplicationState; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.schema.MigrationCoordinator; +import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort; + +public class GossipHelper +{ + public static InstanceAction statusToBootstrap(IInvokableInstance newNode) + { + return (instance) -> + { + changeGossipState(instance, + newNode, + Arrays.asList(tokens(newNode), + statusBootstrapping(newNode), + statusWithPortBootstrapping(newNode))); + }; + } + + public static InstanceAction statusToNormal(IInvokableInstance peer) + { + return (target) -> + { + changeGossipState(target, + peer, + Arrays.asList(tokens(peer), + statusNormal(peer), + releaseVersion(peer), + netVersion(peer), + statusWithPortNormal(peer))); + }; + } + + /** + * This method is unsafe and should be used _only_ when gossip is not used or available: it creates versioned values on the + * target instance, which means Gossip versioning gets out of sync. Use a safe couterpart at all times when performing _any_ + * ring movement operations _or_ if Gossip is used. + */ + public static void unsafeStatusToNormal(IInvokableInstance target, IInstance peer) + { + int messagingVersion = peer.getMessagingVersion(); + changeGossipState(target, + peer, + Arrays.asList(unsafeVersionedValue(target, + ApplicationState.TOKENS, + (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens), + peer.config().getString("partitioner"), + peer.config().getString("initial_token")), + unsafeVersionedValue(target, + ApplicationState.STATUS, + (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens), + peer.config().getString("partitioner"), + peer.config().getString("initial_token")), + unsafeVersionedValue(target, + ApplicationState.STATUS_WITH_PORT, + (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens), + peer.config().getString("partitioner"), + peer.config().getString("initial_token")), + unsafeVersionedValue(target, + ApplicationState.NET_VERSION, + (partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion), + peer.config().getString("partitioner")), + unsafeReleaseVersion(target, + peer.config().getString("partitioner"), + peer.getReleaseVersionString()))); + } + + public static InstanceAction statusToLeaving(IInvokableInstance newNode) + { + return (instance) -> { + changeGossipState(instance, + newNode, + Arrays.asList(tokens(newNode), + statusLeaving(newNode), + statusWithPortLeaving(newNode))); + }; + } + + public static InstanceAction bootstrap() + { + return new BootstrapAction(); + } + + public static InstanceAction bootstrap(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema) + { + return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema); + } + + public static InstanceAction disseminateGossipState(IInvokableInstance newNode) + { + return new DisseminateGossipState(newNode); + } + + public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom) + { + return new PullSchemaFrom(pullFrom); + } + + private static InstanceAction disableBinary() + { + return (instance) -> instance.nodetoolResult("disablebinary").asserts().success(); + } + + private static class DisseminateGossipState implements InstanceAction + { + final Map<InetSocketAddress, byte[]> gossipState; + + public DisseminateGossipState(IInvokableInstance... from) + { + gossipState = new HashMap<>(); + for (IInvokableInstance node : from) + { + byte[] epBytes = node.callsOnInstance(() -> { + EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort()); + return toBytes(epState); + }).call(); + gossipState.put(node.broadcastAddress(), epBytes); + } + } + + public void accept(IInvokableInstance instance) + { + instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress, byte[]>, Void>) + (map) -> { + Map<InetAddressAndPort, EndpointState> newState = new HashMap<>(); + for (Map.Entry<InetSocketAddress, byte[]> e : map.entrySet()) + newState.put(toCassandraInetAddressAndPort(e.getKey()), fromBytes(e.getValue())); + + Gossiper.runInGossipStageBlocking(() -> { + Gossiper.instance.applyStateLocally(newState); + }); + return null; + }).apply(gossipState); + } + } + + private static byte[] toBytes(EndpointState epState) + { + try (DataOutputBuffer out = new DataOutputBuffer(1024)) + { + EndpointState.serializer.serialize(epState, out, MessagingService.current_version); + return out.toByteArray(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + private static EndpointState fromBytes(byte[] bytes) + { + try (DataInputBuffer in = new DataInputBuffer(bytes)) + { + return EndpointState.serializer.deserialize(in, MessagingService.current_version); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + } + + private static class PullSchemaFrom implements InstanceAction + { + final InetSocketAddress pullFrom; + + public PullSchemaFrom(IInvokableInstance pullFrom) + { + this.pullFrom = pullFrom.broadcastAddress();; + } + + public void accept(IInvokableInstance pullTo) + { + pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> { + InetAddressAndPort endpoint = toCassandraInetAddressAndPort(pullFrom); + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); - MigrationManager.scheduleSchemaPull(endpoint, state); - MigrationManager.waitUntilReadyForBootstrap(); ++ MigrationCoordinator.instance.reportEndpointVersion(endpoint, state); ++ MigrationCoordinator.instance.awaitSchemaRequests(TimeUnit.SECONDS.toMillis(10)); + }).accept(pullFrom); + } + } + + private static class BootstrapAction implements InstanceAction, Serializable + { + private final boolean joinRing; + private final Duration waitForBootstrap; + private final Duration waitForSchema; + + public BootstrapAction() + { + this(true, Duration.ofMinutes(10), Duration.ofSeconds(10)); + } + + public BootstrapAction(boolean joinRing, Duration waitForBootstrap, Duration waitForSchema) + { + this.joinRing = joinRing; + this.waitForBootstrap = waitForBootstrap; + this.waitForSchema = waitForSchema; + } + + public void accept(IInvokableInstance instance) + { + instance.appliesOnInstance((String partitionerString, String tokenString) -> { + IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString); + List<Token> tokens = Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString)); + try + { + Collection<InetAddressAndPort> collisions = StorageService.instance.prepareForBootstrap(waitForSchema.toMillis()); + assert collisions.size() == 0 : String.format("Didn't expect any replacements but got %s", collisions); + boolean isBootstrapSuccessful = StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis()); + assert isBootstrapSuccessful : "Bootstrap did not complete successfully"; + StorageService.instance.setUpDistributedSystemKeyspaces(); + if (joinRing) + StorageService.instance.finishJoiningRing(true, tokens); + } + catch (Throwable t) + { + throw new RuntimeException(t); + } + + return null; + }).apply(instance.config().getString("partitioner"), instance.config().getString("initial_token")); + } + } + + public static InstanceAction decomission() + { + return (target) -> target.nodetoolResult("decommission").asserts().success(); + } + + + public static VersionedApplicationState tokens(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.TOKENS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens)); + } + + public static VersionedApplicationState netVersion(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.NET_VERSION, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).networkVersion()); + } + + private static VersionedApplicationState unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String releaseVersionStr) + { + return unsafeVersionedValue(instance, ApplicationState.RELEASE_VERSION, (partitioner) -> new VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr), partitionerStr); + } + + public static VersionedApplicationState releaseVersion(IInvokableInstance instance) + { + return unsafeReleaseVersion(instance, instance.config().getString("partitioner"), instance.getReleaseVersionString()); + } + + public static VersionedApplicationState statusNormal(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens)); + } + + public static VersionedApplicationState statusWithPortNormal(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens)); + } + + public static VersionedApplicationState statusBootstrapping(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens)); + } + + public static VersionedApplicationState statusWithPortBootstrapping(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens)); + } + + public static VersionedApplicationState statusLeaving(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens)); + } + + public static VersionedApplicationState statusLeft(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> { + return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, System.currentTimeMillis() + Gossiper.aVeryLongTime); + }); + } + + public static VersionedApplicationState statusWithPortLeft(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> { + return new VersionedValue.VersionedValueFactory(partitioner).left(tokens, System.currentTimeMillis() + Gossiper.aVeryLongTime); + + }); + } + + public static VersionedApplicationState statusWithPortLeaving(IInvokableInstance instance) + { + return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens)); + } + + public static VersionedValue toVersionedValue(VersionedApplicationState vv) + { + return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version); + } + + public static ApplicationState toApplicationState(VersionedApplicationState vv) + { + return ApplicationState.values()[vv.applicationState]; + } + + private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance, + ApplicationState applicationState, + IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier, + String partitionerStr, String initialTokenStr) + { + return instance.appliesOnInstance((String partitionerString, String tokenString) -> { + IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString); + Token token = partitioner.getTokenFactory().fromString(tokenString); + + VersionedValue versionedValue = supplier.apply(partitioner, Collections.singleton(token)); + return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version); + }).apply(partitionerStr, initialTokenStr); + } + + private static VersionedApplicationState unsafeVersionedValue(IInvokableInstance instance, + ApplicationState applicationState, + IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier, + String partitionerStr) + { + return instance.appliesOnInstance((String partitionerString) -> { + IPartitioner partitioner = FBUtilities.newPartitioner(partitionerString); + VersionedValue versionedValue = supplier.apply(partitioner); + return new VersionedApplicationState(applicationState.ordinal(), versionedValue.value, versionedValue.version); + }).apply(partitionerStr); + } + + public static VersionedApplicationState versionedToken(IInvokableInstance instance, ApplicationState applicationState, IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, VersionedValue> supplier) + { + return unsafeVersionedValue(instance, applicationState, supplier, instance.config().getString("partitioner"), instance.config().getString("initial_token")); + } + + public static InstanceAction removeFromRing(IInvokableInstance peer) + { + return (target) -> { + InetAddressAndPort endpoint = toCassandraInetAddressAndPort(peer.broadcastAddress()); + VersionedApplicationState newState = statusLeft(peer); + + target.runOnInstance(() -> { + // state to 'left' + EndpointState currentState = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + ApplicationState as = toApplicationState(newState); + VersionedValue vv = toVersionedValue(newState); + currentState.addApplicationState(as, vv); + StorageService.instance.onChange(endpoint, as, vv); + + // remove from gossip + Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.unsafeAnulEndpoint(endpoint)); + SystemKeyspace.removeEndpoint(endpoint); + PendingRangeCalculatorService.instance.update(); + PendingRangeCalculatorService.instance.blockUntilFinished(); + }); + }; + } + + /** + * Changes gossip state of the `peer` on `target` + */ + public static void changeGossipState(IInvokableInstance target, IInstance peer, List<VersionedApplicationState> newState) + { + InetSocketAddress addr = peer.broadcastAddress(); + UUID hostId = peer.config().hostId(); + int netVersion = peer.getMessagingVersion(); + target.runOnInstance(() -> { + InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr); + StorageService storageService = StorageService.instance; + + Gossiper.runInGossipStageBlocking(() -> { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state == null) + { + Gossiper.instance.initializeNodeUnsafe(endpoint, hostId, netVersion, 1); + state = Gossiper.instance.getEndpointStateForEndpoint(endpoint); + if (state.isAlive() && !Gossiper.instance.isDeadState(state)) + Gossiper.instance.realMarkAlive(endpoint, state); + } + + for (VersionedApplicationState value : newState) + { + ApplicationState as = toApplicationState(value); + VersionedValue vv = toVersionedValue(value); + state.addApplicationState(as, vv); + storageService.onChange(endpoint, as, vv); + } + }); + }); + } + + public static void withProperty(String prop, boolean value, Runnable r) + { + String before = System.getProperty(prop); + try + { + System.setProperty(prop, Boolean.toString(value)); + r.run(); + } + finally + { - System.setProperty(prop, before == null ? "true" : before); ++ if (before == null) ++ System.clearProperty(prop); ++ else ++ System.setProperty(prop, before); + } + } +} diff --cc test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java index 0000000,0000000..daba0f1 new file mode 100644 --- /dev/null +++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java @@@ -1,0 -1,0 +1,319 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.schema; ++ ++import java.net.UnknownHostException; ++import java.util.Collection; ++import java.util.Collections; ++import java.util.HashSet; ++import java.util.LinkedList; ++import java.util.List; ++import java.util.Queue; ++import java.util.Set; ++import java.util.UUID; ++import java.util.concurrent.Future; ++ ++import com.google.common.collect.Iterables; ++import com.google.common.collect.Sets; ++import com.google.common.util.concurrent.Futures; ++import org.junit.Assert; ++import org.junit.Test; ++ ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import org.apache.cassandra.config.DatabaseDescriptor; ++import org.apache.cassandra.db.Mutation; ++import org.apache.cassandra.locator.InetAddressAndPort; ++import org.apache.cassandra.utils.concurrent.WaitQueue; ++ ++import static com.google.common.util.concurrent.Futures.getUnchecked; ++ ++public class MigrationCoordinatorTest ++{ ++ private static final Logger logger = LoggerFactory.getLogger(MigrationCoordinatorTest.class); ++ ++ private static final InetAddressAndPort EP1; ++ private static final InetAddressAndPort EP2; ++ private static final InetAddressAndPort EP3; ++ ++ private static final UUID LOCAL_VERSION = UUID.randomUUID(); ++ private static final UUID V1 = UUID.randomUUID(); ++ private static final UUID V2 = UUID.randomUUID(); ++ private static final UUID V3 = UUID.randomUUID(); ++ ++ static ++ { ++ try ++ { ++ EP1 = InetAddressAndPort.getByName("10.0.0.1"); ++ EP2 = InetAddressAndPort.getByName("10.0.0.2"); ++ EP3 = InetAddressAndPort.getByName("10.0.0.3"); ++ } ++ catch (UnknownHostException e) ++ { ++ throw new AssertionError(e); ++ } ++ ++ DatabaseDescriptor.daemonInitialization(); ++ } ++ ++ private static class InstrumentedCoordinator extends MigrationCoordinator ++ { ++ ++ Queue<Callback> requests = new LinkedList<>(); ++ @Override ++ protected void sendMigrationMessage(MigrationCoordinator.Callback callback) ++ { ++ requests.add(callback); ++ } ++ ++ boolean shouldPullSchema = true; ++ @Override ++ protected boolean shouldPullSchema(UUID version) ++ { ++ return shouldPullSchema; ++ } ++ ++ boolean shouldPullFromEndpoint = true; ++ @Override ++ protected boolean shouldPullFromEndpoint(InetAddressAndPort endpoint) ++ { ++ return shouldPullFromEndpoint; ++ } ++ ++ boolean shouldPullImmediately = true; ++ @Override ++ protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version) ++ { ++ return shouldPullImmediately; ++ } ++ ++ Set<InetAddressAndPort> deadNodes = new HashSet<>(); ++ protected boolean isAlive(InetAddressAndPort endpoint) ++ { ++ return !deadNodes.contains(endpoint); ++ } ++ ++ UUID localVersion = LOCAL_VERSION; ++ @Override ++ protected boolean isLocalVersion(UUID version) ++ { ++ return localVersion.equals(version); ++ } ++ ++ int maxOutstandingRequests = 3; ++ @Override ++ protected int getMaxOutstandingVersionRequests() ++ { ++ return maxOutstandingRequests; ++ } ++ ++ Set<InetAddressAndPort> mergedSchemasFrom = new HashSet<>(); ++ @Override ++ protected void mergeSchemaFrom(InetAddressAndPort endpoint, Collection<Mutation> mutations) ++ { ++ mergedSchemasFrom.add(endpoint); ++ } ++ } ++ ++ @Test ++ public void requestResponseCycle() throws InterruptedException ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); ++ coordinator.maxOutstandingRequests = 1; ++ ++ Assert.assertTrue(coordinator.requests.isEmpty()); ++ ++ // first schema report should send a migration request ++ getUnchecked(coordinator.reportEndpointVersion(EP1, V1)); ++ Assert.assertEquals(1, coordinator.requests.size()); ++ Assert.assertFalse(coordinator.awaitSchemaRequests(1)); ++ ++ // second should not ++ getUnchecked(coordinator.reportEndpointVersion(EP2, V1)); ++ Assert.assertEquals(1, coordinator.requests.size()); ++ Assert.assertFalse(coordinator.awaitSchemaRequests(1)); ++ ++ // until the first request fails, then the second endpoint should be contacted ++ MigrationCoordinator.Callback request1 = coordinator.requests.poll(); ++ Assert.assertEquals(EP1, request1.endpoint); ++ getUnchecked(request1.fail()); ++ Assert.assertTrue(coordinator.mergedSchemasFrom.isEmpty()); ++ Assert.assertFalse(coordinator.awaitSchemaRequests(1)); ++ ++ // ... then the second endpoint should be contacted ++ Assert.assertEquals(1, coordinator.requests.size()); ++ MigrationCoordinator.Callback request2 = coordinator.requests.poll(); ++ Assert.assertEquals(EP2, request2.endpoint); ++ Assert.assertFalse(coordinator.awaitSchemaRequests(1)); ++ getUnchecked(request2.response(Collections.emptyList())); ++ Assert.assertEquals(EP2, Iterables.getOnlyElement(coordinator.mergedSchemasFrom)); ++ Assert.assertTrue(coordinator.awaitSchemaRequests(1)); ++ ++ // and migration tasks should not be sent out for subsequent version reports ++ getUnchecked(coordinator.reportEndpointVersion(EP3, V1)); ++ Assert.assertTrue(coordinator.requests.isEmpty()); ++ ++ } ++ ++ /** ++ * If we don't send a request for a version, and endpoints associated with ++ * it all change versions, we should signal anyone waiting on that version ++ */ ++ @Test ++ public void versionsAreSignaledWhenDeleted() ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); ++ ++ coordinator.reportEndpointVersion(EP1, V1); ++ WaitQueue.Signal signal = coordinator.getVersionInfoUnsafe(V1).register(); ++ Assert.assertFalse(signal.isSignalled()); ++ ++ coordinator.reportEndpointVersion(EP1, V2); ++ Assert.assertNull(coordinator.getVersionInfoUnsafe(V1)); ++ ++ Assert.assertTrue(signal.isSignalled()); ++ } ++ ++ private static void assertNoContact(InstrumentedCoordinator coordinator, InetAddressAndPort endpoint, UUID version, boolean startupShouldBeUnblocked) ++ { ++ Assert.assertTrue(coordinator.requests.isEmpty()); ++ Future<Void> future = coordinator.reportEndpointVersion(EP1, V1); ++ if (future != null) ++ getUnchecked(future); ++ Assert.assertTrue(coordinator.requests.isEmpty()); ++ ++ Assert.assertEquals(startupShouldBeUnblocked, coordinator.awaitSchemaRequests(1)); ++ } ++ ++ private static void assertNoContact(InstrumentedCoordinator coordinator, boolean startupShouldBeUnblocked) ++ { ++ assertNoContact(coordinator, EP1, V1, startupShouldBeUnblocked); ++ } ++ ++ @Test ++ public void dontContactNodesWithSameSchema() ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); ++ ++ coordinator.localVersion = V1; ++ assertNoContact(coordinator, true); ++ } ++ ++ @Test ++ public void dontContactIncompatibleNodes() ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); ++ ++ coordinator.shouldPullFromEndpoint = false; ++ assertNoContact(coordinator, false); ++ } ++ ++ @Test ++ public void dontContactDeadNodes() ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); ++ ++ coordinator.deadNodes.add(EP1); ++ assertNoContact(coordinator, EP1, V1, false); ++ } ++ ++ /** ++ * If a node has become incompativle between when the task was scheduled and when it ++ * was run, we should detect that and fail the task ++ */ ++ @Test ++ public void testGossipRace() ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator() { ++ protected boolean shouldPullImmediately(InetAddressAndPort endpoint, UUID version) ++ { ++ // this is the last thing that gets called before scheduling the pull, so set this flag here ++ shouldPullFromEndpoint = false; ++ return super.shouldPullImmediately(endpoint, version); ++ } ++ }; ++ ++ Assert.assertTrue(coordinator.shouldPullFromEndpoint(EP1)); ++ assertNoContact(coordinator, EP1, V1, false); ++ } ++ ++ @Test ++ public void testWeKeepSendingRequests() ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); ++ ++ getUnchecked(coordinator.reportEndpointVersion(EP3, V2)); ++ coordinator.requests.remove().response(Collections.emptyList()); ++ ++ getUnchecked(coordinator.reportEndpointVersion(EP1, V1)); ++ getUnchecked(coordinator.reportEndpointVersion(EP2, V1)); ++ ++ MigrationCoordinator.Callback prev = null; ++ Set<InetAddressAndPort> EPs = Sets.newHashSet(EP1, EP2); ++ int ep1requests = 0; ++ int ep2requests = 0; ++ ++ for (int i=0; i<10; i++) ++ { ++ Assert.assertEquals(String.format("%s", i), 2, coordinator.requests.size()); ++ ++ MigrationCoordinator.Callback next = coordinator.requests.remove(); ++ ++ // we should be contacting endpoints in a round robin fashion ++ Assert.assertTrue(EPs.contains(next.endpoint)); ++ if (prev != null && prev.endpoint.equals(next.endpoint)) ++ Assert.fail(String.format("Not expecting prev %s to be equal to next %s", prev.endpoint, next.endpoint)); ++ ++ // should send a new request ++ next.fail(); ++ prev = next; ++ Assert.assertFalse(coordinator.awaitSchemaRequests(1)); ++ ++ Assert.assertEquals(2, coordinator.requests.size()); ++ } ++ logger.info("{} -> {}", EP1, ep1requests); ++ logger.info("{} -> {}", EP2, ep2requests); ++ ++ // a single success should unblock startup though ++ coordinator.requests.remove().response(Collections.emptyList()); ++ Assert.assertTrue(coordinator.awaitSchemaRequests(1)); ++ ++ } ++ ++ /** ++ * Pull unreceived schemas should detect and send requests out for any ++ * schemas that are marked unreceived and have no outstanding requests ++ */ ++ @Test ++ public void pullUnreceived() ++ { ++ InstrumentedCoordinator coordinator = new InstrumentedCoordinator(); ++ ++ coordinator.shouldPullFromEndpoint = false; ++ assertNoContact(coordinator, false); ++ ++ coordinator.shouldPullFromEndpoint = true; ++ Assert.assertEquals(0, coordinator.requests.size()); ++ List<Future<Void>> futures = coordinator.pullUnreceivedSchemaVersions(); ++ futures.forEach(Futures::getUnchecked); ++ Assert.assertEquals(1, coordinator.requests.size()); ++ } ++} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org