Repository: cassandra Updated Branches: refs/heads/trunk 8bc2fba3e -> 13150b001
Wait for schema agreement prior to building MVs patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for CASSANDRA-14571 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13150b00 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13150b00 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13150b00 Branch: refs/heads/trunk Commit: 13150b001a8ddf82a77ac9525c446b7e9e32325c Parents: 8bc2fba Author: Aleksey Yeshchenko <[email protected]> Authored: Wed Jul 18 22:55:23 2018 +0100 Committer: Sam Tunnicliffe <[email protected]> Committed: Fri Jul 20 09:52:51 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/view/ViewBuilderTask.java | 11 +++++ src/java/org/apache/cassandra/gms/Gossiper.java | 47 ++++++++++++++++++++ 3 files changed, 59 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/13150b00/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 63831d7..fe4087b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Wait for schema agreement prior to building MVs (CASSANDRA-14571) * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426) * Bump the hints messaging version to match the current one (CASSANDRA-14536) * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546) http://git-wip-us.apache.org/repos/asf/cassandra/blob/13150b00/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java index 1b16f18..453cb62 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Function; @@ -52,6 +53,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.service.StorageProxy; @@ -122,6 +124,15 @@ public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<L else logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt); + /* + * It's possible for view building to start before MV creation got propagated to other nodes. For this reason + * we should wait for schema to converge before attempting to send any view mutations to other nodes, or else + * face UnknownTableException upon Mutation deserialization on the nodes that haven't processed the schema change. + */ + boolean schemaConverged = Gossiper.instance.waitForSchemaAgreement(10, TimeUnit.SECONDS, () -> this.isStopped); + if (!schemaConverged) + logger.warn("Failed to get schema to converge before building view {}.{}", baseCfs.keyspace.getName(), view.name); + Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function; function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/13150b00/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 3975187..7bb2583 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -23,6 +23,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -1833,4 +1834,50 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.info("No gossip backlog; proceeding"); } + /** + * Blockingly wait for all live nodes to agree on the current schema version. + * + * @param maxWait maximum time to wait for schema agreement + * @param unit TimeUnit of maxWait + * @return true if agreement was reached, false if not + */ + public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, BooleanSupplier abortCondition) + { + int waited = 0; + int toWait = 50; + + Set<InetAddressAndPort> members = getLiveTokenOwners(); + + while (true) + { + if (nodesAgreeOnSchema(members)) + return true; + + if (waited >= unit.toMillis(maxWait) || abortCondition.getAsBoolean()) + return false; + + Uninterruptibles.sleepUninterruptibly(toWait, TimeUnit.MILLISECONDS); + waited += toWait; + toWait = Math.min(1000, toWait * 2); + } + } + + private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes) + { + UUID expectedVersion = null; + + for (InetAddressAndPort node : nodes) + { + EndpointState state = getEndpointStateForEndpoint(node); + UUID remoteVersion = state.getSchemaVersion(); + + if (null == expectedVersion) + expectedVersion = remoteVersion; + + if (null == expectedVersion || !expectedVersion.equals(remoteVersion)) + return false; + } + + return true; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
