http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java index f6545b0..40f1b84 100644 --- a/src/java/org/apache/cassandra/db/view/View.java +++ b/src/java/org/apache/cassandra/db/view/View.java @@ -25,7 +25,7 @@ import javax.annotation.Nullable; import com.google.common.collect.Iterables; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.cql3.selection.RawSelector; import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; @@ -34,7 +34,6 @@ import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.ViewMetadata; -import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,19 +55,13 @@ public class View public volatile List<ColumnMetadata> baseNonPKColumnsInViewPK; private ViewBuilder builder; - // Only the raw statement can be final, because the statement cannot always be prepared when the MV is initialized. - // For example, during startup, this view will be initialized as part of the Keyspace.open() work; preparing a statement - // also requires the keyspace to be open, so this results in double-initialization problems. - private final SelectStatement.RawStatement rawSelect; private SelectStatement select; private ReadQuery query; - public View(ViewMetadata definition, - ColumnFamilyStore baseCfs) + public View(ViewMetadata definition, ColumnFamilyStore baseCfs) { this.baseCfs = baseCfs; - this.name = definition.name; - this.rawSelect = definition.select; + this.name = definition.name(); updateDefinition(definition); } @@ -160,31 +153,52 @@ public class View * Returns the SelectStatement used to populate and filter this view. Internal users should access the select * statement this way to ensure it has been prepared. */ - public SelectStatement getSelectStatement() + SelectStatement getSelectStatement() { - if (select == null) + if (null == select) { - ClientState state = ClientState.forInternalCalls(); - state.setKeyspace(baseCfs.keyspace.getName()); - rawSelect.prepareKeyspace(state); - ParsedStatement.Prepared prepared = rawSelect.prepare(true); - select = (SelectStatement) prepared.statement; + SelectStatement.Parameters parameters = + new SelectStatement.Parameters(Collections.emptyMap(), + Collections.emptyList(), + false, + true, + false); + + SelectStatement.RawStatement rawSelect = + new SelectStatement.RawStatement(new QualifiedName(baseCfs.keyspace.getName(), baseCfs.name), + parameters, + selectClause(), + definition.whereClause, + null, + null); + + rawSelect.setBindVariables(Collections.emptyList()); + + select = rawSelect.prepare(true); } return select; } + private List<RawSelector> selectClause() + { + return definition.metadata + .columns() + .stream() + .map(c -> c.name.toString()) + .map(ColumnMetadata.Raw::forQuoted) + .map(c -> new RawSelector(c, null)) + .collect(Collectors.toList()); + } + /** * Returns the ReadQuery used to filter this view. Internal users should access the query this way to ensure it * has been prepared. */ - public ReadQuery getReadQuery() + ReadQuery getReadQuery() { if (query == null) - { query = getSelectStatement().getQuery(QueryOptions.forInternalCalls(Collections.emptyList()), FBUtilities.nowInSeconds()); - logger.trace("View query: {}", rawSelect); - } return query; } @@ -216,63 +230,13 @@ public class View return (view == null) ? null : Schema.instance.getTableMetadataRef(view.baseTableId); } + // TODO: REMOVE public static Iterable<ViewMetadata> findAll(String keyspace, String baseTable) { KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace); return Iterables.filter(ksm.views, view -> view.baseTableName.equals(baseTable)); } - /** - * Builds the string text for a materialized view's SELECT statement. - */ - public static String buildSelectStatement(String cfName, Collection<ColumnMetadata> includedColumns, String whereClause) - { - StringBuilder rawSelect = new StringBuilder("SELECT "); - if (includedColumns == null || includedColumns.isEmpty()) - rawSelect.append("*"); - else - rawSelect.append(includedColumns.stream().map(id -> id.name.toCQLString()).collect(Collectors.joining(", "))); - rawSelect.append(" FROM \"").append(cfName).append("\" WHERE ") .append(whereClause).append(" ALLOW FILTERING"); - return rawSelect.toString(); - } - - public static String relationsToWhereClause(List<Relation> whereClause) - { - List<String> expressions = new ArrayList<>(whereClause.size()); - for (Relation rel : whereClause) - { - StringBuilder sb = new StringBuilder(); - - if (rel.isMultiColumn()) - { - sb.append(((MultiColumnRelation) rel).getEntities().stream() - .map(ColumnMetadata.Raw::toString) - .collect(Collectors.joining(", ", "(", ")"))); - } - else - { - sb.append(((SingleColumnRelation) rel).getEntity()); - } - - sb.append(" ").append(rel.operator()).append(" "); - - if (rel.isIN()) - { - sb.append(rel.getInValues().stream() - .map(Term.Raw::getText) - .collect(Collectors.joining(", ", "(", ")"))); - } - else - { - sb.append(rel.getValue().getText()); - } - - expressions.add(sb.toString()); - } - - return expressions.stream().collect(Collectors.joining(" AND ")); - } - public boolean hasSamePrimaryKeyColumnsAsBaseTable() { return baseNonPKColumnsInViewPK.isEmpty();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/db/view/ViewManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java index 8d12349..0d565ae 100644 --- a/src/java/org/apache/cassandra/db/view/ViewManager.java +++ b/src/java/org/apache/cassandra/db/view/ViewManager.java @@ -101,7 +101,7 @@ public class ViewManager Map<String, ViewMetadata> newViewsByName = Maps.newHashMapWithExpectedSize(views.size()); for (ViewMetadata definition : views) { - newViewsByName.put(definition.name, definition); + newViewsByName.put(definition.name(), definition); } for (String viewName : viewsByName.keySet()) @@ -147,14 +147,14 @@ public class ViewManager if (!keyspace.hasColumnFamilyStore(definition.baseTableId)) { logger.warn("Not adding view {} because the base table {} is unknown", - definition.name, + definition.name(), definition.baseTableId); return; } View view = new View(definition, keyspace.getColumnFamilyStore(definition.baseTableId)); forTable(view.getDefinition().baseTableId).add(view); - viewsByName.put(definition.name, view); + viewsByName.put(definition.name(), view); } public void removeView(String name) http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java index 204d9ee..4bbb861 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java @@ -31,8 +31,8 @@ import com.google.common.net.HostAndPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.InvalidRequestException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 70aebbd..d765fac 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -50,7 +50,7 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.RowFilter; http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/TargetParser.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/TargetParser.java b/src/java/org/apache/cassandra/index/TargetParser.java index bc679f2..9ada4c6 100644 --- a/src/java/org/apache/cassandra/index/TargetParser.java +++ b/src/java/org/apache/cassandra/index/TargetParser.java @@ -25,7 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.utils.Pair; http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index fb0d629..76b7543 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -37,7 +37,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.RowFilter; http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/index/sasi/SASIIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java index 76e5801..3ffcb6e 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndex.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndex.java @@ -25,7 +25,7 @@ import com.googlecode.concurrenttrees.common.Iterables; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 22296e8..0c2cf28 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -31,16 +31,16 @@ import java.util.stream.Collectors; import com.datastax.driver.core.ProtocolVersion; import com.datastax.driver.core.TypeCodec; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UpdateParameters; import org.apache.cassandra.cql3.functions.UDHelper; -import org.apache.cassandra.cql3.statements.CreateTableStatement; -import org.apache.cassandra.cql3.statements.CreateTypeStatement; import org.apache.cassandra.cql3.statements.ModificationStatement; -import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.cql3.statements.UpdateStatement; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.DecoratedKey; @@ -58,14 +58,12 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SchemaKeyspace; -import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.schema.Types; import org.apache.cassandra.schema.Views; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.Pair; /** * Utility to write SSTables. @@ -348,8 +346,8 @@ public class CQLSSTableWriter implements Closeable protected SSTableFormat.Type formatType = null; - private CreateTableStatement.RawStatement schemaStatement; - private final List<CreateTypeStatement> typeStatements; + private CreateTableStatement.Raw schemaStatement; + private final List<CreateTypeStatement.Raw> typeStatements; private ModificationStatement.Parsed insertStatement; private IPartitioner partitioner; @@ -398,7 +396,7 @@ public class CQLSSTableWriter implements Closeable public Builder withType(String typeDefinition) throws SyntaxException { - typeStatements.add(QueryProcessor.parseStatement(typeDefinition, CreateTypeStatement.class, "CREATE TYPE")); + typeStatements.add(QueryProcessor.parseStatement(typeDefinition, CreateTypeStatement.Raw.class, "CREATE TYPE")); return this; } @@ -418,7 +416,7 @@ public class CQLSSTableWriter implements Closeable */ public Builder forTable(String schema) { - this.schemaStatement = QueryProcessor.parseStatement(schema, CreateTableStatement.RawStatement.class, "CREATE TABLE"); + this.schemaStatement = QueryProcessor.parseStatement(schema, CreateTableStatement.Raw.class, "CREATE TABLE"); return this; } @@ -531,10 +529,9 @@ public class CQLSSTableWriter implements Closeable Functions.none())); } - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspaceName); - TableMetadata tableMetadata = ksm.tables.getNullable(schemaStatement.columnFamily()); + TableMetadata tableMetadata = ksm.tables.getNullable(schemaStatement.table()); if (tableMetadata == null) { Types types = createTypes(keyspaceName); @@ -542,24 +539,24 @@ public class CQLSSTableWriter implements Closeable Schema.instance.load(ksm.withSwapped(ksm.tables.with(tableMetadata)).withSwapped(types)); } - Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert = prepareInsert(); + UpdateStatement preparedInsert = prepareInsert(); TableMetadataRef ref = TableMetadataRef.forOfflineTools(tableMetadata); AbstractSSTableSimpleWriter writer = sorted - ? new SSTableSimpleWriter(directory, ref, preparedInsert.left.updatedColumns()) - : new SSTableSimpleUnsortedWriter(directory, ref, preparedInsert.left.updatedColumns(), bufferSizeInMB); + ? new SSTableSimpleWriter(directory, ref, preparedInsert.updatedColumns()) + : new SSTableSimpleUnsortedWriter(directory, ref, preparedInsert.updatedColumns(), bufferSizeInMB); if (formatType != null) writer.setSSTableFormatType(formatType); - return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right); + return new CQLSSTableWriter(writer, preparedInsert, preparedInsert.getBindVariables()); } } private Types createTypes(String keyspace) { Types.RawBuilder builder = Types.rawBuilder(keyspace); - for (CreateTypeStatement st : typeStatements) + for (CreateTypeStatement.Raw st : typeStatements) st.addToRawBuilder(builder); return builder.build(); } @@ -571,10 +568,11 @@ public class CQLSSTableWriter implements Closeable */ private TableMetadata createTable(Types types) { - CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(types).statement; + ClientState state = ClientState.forInternalCalls(); + CreateTableStatement statement = schemaStatement.prepare(state); statement.validate(ClientState.forInternalCalls()); - TableMetadata.Builder builder = statement.builder(); + TableMetadata.Builder builder = statement.builder(types); if (partitioner != null) builder.partitioner(partitioner); @@ -586,20 +584,20 @@ public class CQLSSTableWriter implements Closeable * * @return prepared Insert statement and it's bound names */ - private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert() + private UpdateStatement prepareInsert() { - ParsedStatement.Prepared cqlStatement = insertStatement.prepare(); - UpdateStatement insert = (UpdateStatement) cqlStatement.statement; - insert.validate(ClientState.forInternalCalls()); + ClientState state = ClientState.forInternalCalls(); + UpdateStatement insert = (UpdateStatement) insertStatement.prepare(state); + insert.validate(state); if (insert.hasConditions()) throw new IllegalArgumentException("Conditional statements are not supported"); if (insert.isCounter()) throw new IllegalArgumentException("Counter update statements are not supported"); - if (cqlStatement.boundNames.isEmpty()) + if (insert.getBindVariables().isEmpty()) throw new IllegalArgumentException("Provided insert statement has no bind variables"); - return Pair.create(insert, cqlStatement.boundNames); + return insert; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 4097715..90c0146 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -64,6 +64,7 @@ import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.PreviewKind; @@ -715,7 +716,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti String format = "select event_id, source, source_port, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;"; String query = String.format(format, SchemaConstants.TRACE_KEYSPACE_NAME, TraceKeyspace.EVENTS); - SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement; + SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare(ClientState.forInternalCalls()); ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId); InetAddressAndPort source = FBUtilities.getBroadcastAddressAndPort(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java index 19d83db..a85a1e5 100644 --- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java +++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.cql3.statements.CreateTableStatement; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/ColumnMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/ColumnMetadata.java b/src/java/org/apache/cassandra/schema/ColumnMetadata.java index 0380b35..b6e743b 100644 --- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java +++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java @@ -23,7 +23,6 @@ import java.util.function.Predicate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; import com.google.common.collect.Collections2; import org.apache.cassandra.cql3.*; @@ -263,12 +262,29 @@ public final class ColumnMetadata extends ColumnSpecification implements Selecta ColumnMetadata cd = (ColumnMetadata) o; - return Objects.equal(ksName, cd.ksName) - && Objects.equal(cfName, cd.cfName) - && Objects.equal(name, cd.name) - && Objects.equal(type, cd.type) - && Objects.equal(kind, cd.kind) - && Objects.equal(position, cd.position); + return equalsWithoutType(cd) && type.equals(cd.type); + } + + private boolean equalsWithoutType(ColumnMetadata other) + { + return name.equals(other.name) + && kind == other.kind + && position == other.position + && ksName.equals(other.ksName) + && cfName.equals(other.cfName); + } + + Optional<Difference> compare(ColumnMetadata other) + { + if (!equalsWithoutType(other)) + return Optional.of(Difference.SHALLOW); + + if (type.equals(other.type)) + return Optional.empty(); + + return type.asCQL3Type().toString().equals(other.type.asCQL3Type().toString()) + ? Optional.of(Difference.DEEP) + : Optional.of(Difference.SHALLOW); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/CompressionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/CompressionParams.java b/src/java/org/apache/cassandra/schema/CompressionParams.java index b96334b..d644c56 100644 --- a/src/java/org/apache/cassandra/schema/CompressionParams.java +++ b/src/java/org/apache/cassandra/schema/CompressionParams.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.builder.EqualsBuilder; @@ -84,6 +85,7 @@ public final class CompressionParams private final double minCompressRatio; // In configuration we store min ratio, the input parameter. private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be used by the compressor + // TODO: deprecated, should now be carefully removed. Doesn't affect schema code as it isn't included in equals() and hashCode() private volatile double crcCheckChance = 1.0; public static CompressionParams fromMap(Map<String, String> opts) @@ -548,20 +550,17 @@ public final class CompressionParams public boolean equals(Object obj) { if (obj == this) - { return true; - } - else if (obj == null || obj.getClass() != getClass()) - { + + if (!(obj instanceof CompressionParams)) return false; - } CompressionParams cp = (CompressionParams) obj; - return new EqualsBuilder() - .append(sstableCompressor, cp.sstableCompressor) - .append(chunkLength(), cp.chunkLength()) - .append(otherOptions, cp.otherOptions) - .isEquals(); + + return Objects.equal(sstableCompressor, cp.sstableCompressor) + && chunkLength == cp.chunkLength + && otherOptions.equals(cp.otherOptions) + && minCompressRatio == cp.minCompressRatio; } @Override @@ -569,8 +568,9 @@ public final class CompressionParams { return new HashCodeBuilder(29, 1597) .append(sstableCompressor) - .append(chunkLength()) + .append(chunkLength) .append(otherOptions) + .append(minCompressRatio) .toHashCode(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Diff.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Diff.java b/src/java/org/apache/cassandra/schema/Diff.java new file mode 100644 index 0000000..36c0687 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/Diff.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.Iterables; + +public class Diff<T extends Iterable, S> +{ + public final T created; + public final T dropped; + public final ImmutableCollection<Altered<S>> altered; + + Diff(T created, T dropped, ImmutableCollection<Altered<S>> altered) + { + this.created = created; + this.dropped = dropped; + this.altered = altered; + } + + boolean isEmpty() + { + return Iterables.isEmpty(created) && Iterables.isEmpty(dropped) && Iterables.isEmpty(altered); + } + + Iterable<Altered<S>> altered(Difference kind) + { + return Iterables.filter(altered, a -> a.kind == kind); + } + + public static final class Altered<T> + { + public final T before; + public final T after; + public final Difference kind; + + Altered(T before, T after, Difference kind) + { + this.before = before; + this.after = after; + this.kind = kind; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Difference.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Difference.java b/src/java/org/apache/cassandra/schema/Difference.java new file mode 100644 index 0000000..4f1aea9 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/Difference.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +public enum Difference +{ + /** + * Two schema objects are considered to differ DEEP-ly if one or more of their nested schema objects differ. + * + * For example, if a table T has a column c of type U, where U is a user defined type, then upon altering U table + * T0 (before alter) will differ DEEP-ly from table T1 (after alter). + */ + DEEP, + + /** + * + * Two schema objects are considered to differ DEEP-ly if their direct structure is altered. + * + * For example, if a table T is altered to add a new column, a different compaction strategy, or a new description, + * then it will differ SHALLOW-ly from the original. + */ + SHALLOW +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Functions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Functions.java b/src/java/org/apache/cassandra/schema/Functions.java index 8e3a3f1..2a0111c 100644 --- a/src/java/org/apache/cassandra/schema/Functions.java +++ b/src/java/org/apache/cassandra/schema/Functions.java @@ -17,19 +17,20 @@ */ package org.apache.cassandra.schema; +import java.nio.ByteBuffer; import java.util.*; -import java.util.stream.Collectors; +import java.util.function.Predicate; import java.util.stream.Stream; -import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; +import com.google.common.collect.*; import org.apache.cassandra.cql3.functions.*; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.db.marshal.UserType; -import static com.google.common.collect.Iterables.filter; +import static java.util.stream.Collectors.toList; + +import static com.google.common.collect.Iterables.any; /** * An immutable container for a keyspace's UDAs and UDFs (and, in case of {@link org.apache.cassandra.db.SystemKeyspace}, @@ -37,6 +38,21 @@ import static com.google.common.collect.Iterables.filter; */ public final class Functions implements Iterable<Function> { + public enum Filter implements Predicate<Function> + { + ALL, UDF, UDA; + + public boolean test(Function function) + { + switch (this) + { + case UDF: return function instanceof UDFunction; + case UDA: return function instanceof UDAggregate; + default: return true; + } + } + } + private final ImmutableMultimap<FunctionName, Function> functions; private Functions(Builder builder) @@ -69,12 +85,17 @@ public final class Functions implements Iterable<Function> return functions.values().stream(); } + public int size() + { + return functions.size(); + } + /** * @return a stream of keyspace's UDFs */ public Stream<UDFunction> udfs() { - return stream().filter(f -> f instanceof UDFunction).map(f -> (UDFunction) f); + return stream().filter(Filter.UDF).map(f -> (UDFunction) f); } /** @@ -82,38 +103,32 @@ public final class Functions implements Iterable<Function> */ public Stream<UDAggregate> udas() { - return stream().filter(f -> f instanceof UDAggregate).map(f -> (UDAggregate) f); + return stream().filter(Filter.UDA).map(f -> (UDAggregate) f); } - MapDifference<Pair<FunctionName, List<String>>, UDFunction> udfsDiff(Functions other) + public Iterable<Function> referencingUserType(ByteBuffer name) { - Map<Pair<FunctionName, List<String>>, UDFunction> before = new HashMap<>(); - udfs().forEach(f -> before.put(Pair.create(f.name(), f.argumentsList()), f)); - - Map<Pair<FunctionName, List<String>>, UDFunction> after = new HashMap<>(); - other.udfs().forEach(f -> after.put(Pair.create(f.name(), f.argumentsList()), f)); - - return Maps.difference(before, after); + return Iterables.filter(this, f -> f.referencesUserType(name)); } - MapDifference<Pair<FunctionName, List<String>>, UDAggregate> udasDiff(Functions other) + public Functions withUpdatedUserType(UserType udt) { - Map<Pair<FunctionName, List<String>>, UDAggregate> before = new HashMap<>(); - udas().forEach(f -> before.put(Pair.create(f.name(), f.argumentsList()), f)); + if (!any(this, f -> f.referencesUserType(udt.name))) + return this; - Map<Pair<FunctionName, List<String>>, UDAggregate> after = new HashMap<>(); - other.udas().forEach(f -> after.put(Pair.create(f.name(), f.argumentsList()), f)); + Collection<UDFunction> udfs = udfs().map(f -> f.withUpdatedUserType(udt)).collect(toList()); + Collection<UDAggregate> udas = udas().map(f -> f.withUpdatedUserType(udfs, udt)).collect(toList()); - return Maps.difference(before, after); + return builder().add(udfs).add(udas).build(); } /** - * @return a collection of aggregates that use the provided function as either a state or a final function + * @return a stream of aggregates that use the provided function as either a state or a final function * @param function the referree function */ - public Collection<UDAggregate> aggregatesUsingFunction(Function function) + public Stream<UDAggregate> aggregatesUsingFunction(Function function) { - return udas().filter(uda -> uda.hasReferenceTo(function)).collect(Collectors.toList()); + return udas().filter(uda -> uda.hasReferenceTo(function)); } /** @@ -127,6 +142,11 @@ public final class Functions implements Iterable<Function> return functions.get(name); } + public Optional<Function> find(FunctionName name, List<AbstractType<?>> argTypes) + { + return find(name, argTypes, Filter.ALL); + } + /** * Find the function with the specified name * @@ -134,13 +154,18 @@ public final class Functions implements Iterable<Function> * @param argTypes function argument types * @return an empty {@link Optional} if the function name is not found; a non-empty optional of {@link Function} otherwise */ - public Optional<Function> find(FunctionName name, List<AbstractType<?>> argTypes) + public Optional<Function> find(FunctionName name, List<AbstractType<?>> argTypes, Filter filter) { return get(name).stream() - .filter(fun -> typesMatch(fun.argTypes(), argTypes)) + .filter(filter.and(fun -> typesMatch(fun.argTypes(), argTypes))) .findAny(); } + public boolean isEmpty() + { + return functions.isEmpty(); + } + /* * We need to compare the CQL3 representation of the type because comparing * the AbstractType will fail for example if a UDT has been changed. @@ -154,7 +179,7 @@ public final class Functions implements Iterable<Function> * or * ALTER TYPE foo RENAME ... */ - public static boolean typesMatch(AbstractType<?> t1, AbstractType<?> t2) + private static boolean typesMatch(AbstractType<?> t1, AbstractType<?> t2) { return t1.freeze().asCQL3Type().toString().equals(t2.freeze().asCQL3Type().toString()); } @@ -184,6 +209,13 @@ public final class Functions implements Iterable<Function> return h; } + public Functions filter(Predicate<Function> predicate) + { + Builder builder = builder(); + stream().filter(predicate).forEach(builder::add); + return builder.build(); + } + /** * Create a Functions instance with the provided function added */ @@ -203,7 +235,19 @@ public final class Functions implements Iterable<Function> Function fun = find(name, argTypes).orElseThrow(() -> new IllegalStateException(String.format("Function %s doesn't exists", name))); - return builder().add(filter(this, f -> f != fun)).build(); + return without(fun); + } + + public Functions without(Function function) + { + return builder().add(Iterables.filter(this, f -> f != function)).build(); + } + + public Functions withAddedOrUpdated(Function function) + { + return builder().add(Iterables.filter(this, f -> !(f.name().equals(function.name()) && Functions.typesMatch(f.argTypes(), function.argTypes())))) + .add(function) + .build(); } @Override @@ -252,10 +296,52 @@ public final class Functions implements Iterable<Function> return this; } - public Builder add(Iterable<? extends Function> funs) + public Builder add(Iterable<? extends Function> funs) { funs.forEach(this::add); return this; } } + + @SuppressWarnings("unchecked") + static FunctionsDiff<UDFunction> udfsDiff(Functions before, Functions after) + { + return (FunctionsDiff<UDFunction>) FunctionsDiff.diff(before, after, Filter.UDF); + } + + @SuppressWarnings("unchecked") + static FunctionsDiff<UDAggregate> udasDiff(Functions before, Functions after) + { + return (FunctionsDiff<UDAggregate>) FunctionsDiff.diff(before, after, Filter.UDA); + } + + public static final class FunctionsDiff<T extends Function> extends Diff<Functions, T> + { + static final FunctionsDiff NONE = new FunctionsDiff<>(Functions.none(), Functions.none(), ImmutableList.of()); + + private FunctionsDiff(Functions created, Functions dropped, ImmutableCollection<Altered<T>> altered) + { + super(created, dropped, altered); + } + + private static FunctionsDiff diff(Functions before, Functions after, Filter filter) + { + if (before == after) + return NONE; + + Functions created = after.filter(filter.and(k -> !before.find(k.name(), k.argTypes(), filter).isPresent())); + Functions dropped = before.filter(filter.and(k -> !after.find(k.name(), k.argTypes(), filter).isPresent())); + + ImmutableList.Builder<Altered<Function>> altered = ImmutableList.builder(); + before.stream().filter(filter).forEach(functionBefore -> + { + after.find(functionBefore.name(), functionBefore.argTypes(), filter).ifPresent(functionAfter -> + { + functionBefore.compare(functionAfter).ifPresent(kind -> altered.add(new Altered<>(functionBefore, functionAfter, kind))); + }); + }); + + return new FunctionsDiff<>(created, dropped, altered.build()); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/IndexMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java index 190871a..3020793 100644 --- a/src/java/org/apache/cassandra/schema/IndexMetadata.java +++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.cql3.statements.IndexTarget; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.UnknownIndexException; import org.apache.cassandra.index.Index; @@ -98,12 +98,14 @@ public final class IndexMetadata return name != null && !name.isEmpty() && PATTERN_WORD_CHARS.matcher(name).matches(); } - public static String getDefaultIndexName(String cfName, String root) + public static String generateDefaultIndexName(String table, ColumnIdentifier column) { - if (root == null) - return PATTERN_NON_WORD_CHAR.matcher(cfName + "_" + "idx").replaceAll(""); - else - return PATTERN_NON_WORD_CHAR.matcher(cfName + "_" + root + "_idx").replaceAll(""); + return PATTERN_NON_WORD_CHAR.matcher(table + "_" + column.toString() + "_idx").replaceAll(""); + } + + public static String generateDefaultIndexName(String table) + { + return PATTERN_NON_WORD_CHAR.matcher(table + "_" + "idx").replaceAll(""); } public void validate(TableMetadata table) http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Indexes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java index 6122197..a83be4b 100644 --- a/src/java/org/apache/cassandra/schema/Indexes.java +++ b/src/java/org/apache/cassandra/schema/Indexes.java @@ -23,8 +23,6 @@ import java.util.stream.Stream; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; -import org.apache.cassandra.exceptions.ConfigurationException; - import static java.lang.String.format; import static com.google.common.collect.Iterables.filter; @@ -169,20 +167,6 @@ public final class Indexes implements Iterable<IndexMetadata> public void validate(TableMetadata table) { - /* - * Index name check is duplicated in Keyspaces, for the time being. - * The reason for this is that schema altering statements are not calling - * Keyspaces.validate() as of yet. TODO: remove this once they do (on CASSANDRA-9425 completion) - */ - Set<String> indexNames = Sets.newHashSetWithExpectedSize(indexesByName.size()); - for (IndexMetadata index : indexesByName.values()) - { - if (indexNames.contains(index.name)) - throw new ConfigurationException(format("Duplicate index name %s for table %s", index.name, table)); - - indexNames.add(index.name); - } - indexesByName.values().forEach(i -> i.validate(table)); } @@ -198,20 +182,6 @@ public final class Indexes implements Iterable<IndexMetadata> return indexesByName.values().toString(); } - public static String getAvailableIndexName(String ksName, String cfName, String indexNameRoot) - { - - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName); - Set<String> existingNames = ksm == null ? new HashSet<>() : ksm.existingIndexNames(null); - String baseName = IndexMetadata.getDefaultIndexName(cfName, indexNameRoot); - String acceptedName = baseName; - int i = 0; - while (existingNames.contains(acceptedName)) - acceptedName = baseName + '_' + (++i); - - return acceptedName; - } - public static final class Builder { final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java index 5a72d2c..aacd962 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java @@ -27,10 +27,22 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.Iterables; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.schema.Functions.FunctionsDiff; +import org.apache.cassandra.schema.Tables.TablesDiff; +import org.apache.cassandra.schema.Types.TypesDiff; +import org.apache.cassandra.schema.Views.ViewsDiff; +import org.apache.cassandra.service.StorageService; import static java.lang.String.format; +import static com.google.common.collect.Iterables.any; + /** * An immutable representation of keyspace metadata (name, params, tables, types, and functions). */ @@ -110,9 +122,24 @@ public final class KeyspaceMetadata return kind == Kind.VIRTUAL; } + /** + * Returns a new KeyspaceMetadata with all instances of old UDT replaced with the updated version. + * Replaces all instances in tables, views, types, and functions. + */ + public KeyspaceMetadata withUpdatedUserType(UserType udt) + { + return new KeyspaceMetadata(name, + kind, + params, + tables.withUpdatedUserType(udt), + views.withUpdatedUserTypes(udt), + types.withUpdatedUserType(udt), + functions.withUpdatedUserType(udt)); + } + public Iterable<TableMetadata> tablesAndViews() { - return Iterables.concat(tables, views.metadatas()); + return Iterables.concat(tables, views.allTableMetadata()); } @Nullable @@ -124,14 +151,34 @@ public final class KeyspaceMetadata : view.metadata; } - public Set<String> existingIndexNames(String cfToExclude) + public boolean hasTable(String tableName) { - Set<String> indexNames = new HashSet<>(); - for (TableMetadata table : tables) - if (cfToExclude == null || !table.name.equals(cfToExclude)) - for (IndexMetadata index : table.indexes) - indexNames.add(index.name); - return indexNames; + return tables.get(tableName).isPresent(); + } + + public boolean hasView(String viewName) + { + return views.get(viewName).isPresent(); + } + + public boolean hasIndex(String indexName) + { + return any(tables, t -> t.indexes.has(indexName)); + } + + public String findAvailableIndexName(String baseName) + { + if (!hasIndex(baseName)) + return baseName; + + int i = 1; + do + { + String name = baseName + '_' + i++; + if (!hasIndex(name)) + return name; + } + while (true); } public Optional<TableMetadata> findIndexedTable(String indexName) @@ -209,4 +256,77 @@ public final class KeyspaceMetadata } } } + + public AbstractReplicationStrategy createReplicationStrategy() + { + return AbstractReplicationStrategy.createReplicationStrategy(name, + params.replication.klass, + StorageService.instance.getTokenMetadata(), + DatabaseDescriptor.getEndpointSnitch(), + params.replication.options); + } + + static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, KeyspaceMetadata after) + { + return KeyspaceDiff.diff(before, after); + } + + public static final class KeyspaceDiff + { + public final KeyspaceMetadata before; + public final KeyspaceMetadata after; + + public final TablesDiff tables; + public final ViewsDiff views; + public final TypesDiff types; + + public final FunctionsDiff<UDFunction> udfs; + public final FunctionsDiff<UDAggregate> udas; + + private KeyspaceDiff(KeyspaceMetadata before, + KeyspaceMetadata after, + TablesDiff tables, + ViewsDiff views, + TypesDiff types, + FunctionsDiff<UDFunction> udfs, + FunctionsDiff<UDAggregate> udas) + { + this.before = before; + this.after = after; + this.tables = tables; + this.views = views; + this.types = types; + this.udfs = udfs; + this.udas = udas; + } + + private static Optional<KeyspaceDiff> diff(KeyspaceMetadata before, KeyspaceMetadata after) + { + if (before == after) + return Optional.empty(); + + if (!before.name.equals(after.name)) + { + String msg = String.format("Attempting to diff two keyspaces with different names ('%s' and '%s')", before.name, after.name); + throw new IllegalArgumentException(msg); + } + + TablesDiff tables = Tables.diff(before.tables, after.tables); + ViewsDiff views = Views.diff(before.views, after.views); + TypesDiff types = Types.diff(before.types, after.types); + + @SuppressWarnings("unchecked") FunctionsDiff<UDFunction> udfs = FunctionsDiff.NONE; + @SuppressWarnings("unchecked") FunctionsDiff<UDAggregate> udas = FunctionsDiff.NONE; + if (before.functions != after.functions) + { + udfs = Functions.udfsDiff(before.functions, after.functions); + udas = Functions.udasDiff(before.functions, after.functions); + } + + if (before.params.equals(after.params) && tables.isEmpty() && views.isEmpty() && types.isEmpty() && udfs.isEmpty() && udas.isEmpty()) + return Optional.empty(); + + return Optional.of(new KeyspaceDiff(before, after, tables, views, types, udfs, udas)); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/KeyspaceParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java b/src/java/org/apache/cassandra/schema/KeyspaceParams.java index 1deaa29..68ac5e4 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java @@ -31,8 +31,8 @@ public final class KeyspaceParams public static final boolean DEFAULT_DURABLE_WRITES = true; /** - * This determines durable writes for the {@link org.apache.cassandra.config.SchemaConstants#SCHEMA_KEYSPACE_NAME} - * and {@link org.apache.cassandra.config.SchemaConstants#SYSTEM_KEYSPACE_NAME} keyspaces, + * This determines durable writes for the {@link org.apache.cassandra.schema.SchemaConstants#SCHEMA_KEYSPACE_NAME} + * and {@link org.apache.cassandra.schema.SchemaConstants#SYSTEM_KEYSPACE_NAME} keyspaces, * the only reason it is not final is for commitlog unit tests. It should only be changed for testing purposes. */ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/Keyspaces.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Keyspaces.java b/src/java/org/apache/cassandra/schema/Keyspaces.java index 1692f88..1938d93 100644 --- a/src/java/org/apache/cassandra/schema/Keyspaces.java +++ b/src/java/org/apache/cassandra/schema/Keyspaces.java @@ -18,18 +18,21 @@ package org.apache.cassandra.schema; import java.util.Iterator; +import java.util.Optional; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; import javax.annotation.Nullable; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; +import com.google.common.collect.*; + +import org.apache.cassandra.schema.KeyspaceMetadata.KeyspaceDiff; public final class Keyspaces implements Iterable<KeyspaceMetadata> { + private static final Keyspaces NONE = builder().build(); + private final ImmutableMap<String, KeyspaceMetadata> keyspaces; private final ImmutableMap<TableId, TableMetadata> tables; @@ -46,7 +49,7 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> public static Keyspaces none() { - return builder().build(); + return NONE; } public static Keyspaces of(KeyspaceMetadata... keyspaces) @@ -69,18 +72,39 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> return keyspaces.keySet(); } + /** + * Get the keyspace with the specified name + * + * @param name a non-qualified keyspace name + * @return an empty {@link Optional} if the table name is not found; a non-empty optional of {@link KeyspaceMetadata} otherwise + */ + public Optional<KeyspaceMetadata> get(String name) + { + return Optional.ofNullable(keyspaces.get(name)); + } + @Nullable public KeyspaceMetadata getNullable(String name) { return keyspaces.get(name); } + public boolean containsKeyspace(String name) + { + return keyspaces.containsKey(name); + } + @Nullable public TableMetadata getTableOrViewNullable(TableId id) { return tables.get(id); } + public boolean isEmpty() + { + return keyspaces.isEmpty(); + } + public Keyspaces filter(Predicate<KeyspaceMetadata> predicate) { Builder builder = builder(); @@ -97,19 +121,19 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> if (keyspace == null) throw new IllegalStateException(String.format("Keyspace %s doesn't exists", name)); - return builder().add(filter(k -> k != keyspace)).build(); + return filter(k -> k != keyspace); } public Keyspaces withAddedOrUpdated(KeyspaceMetadata keyspace) { - return builder().add(filter(k -> !k.name.equals(keyspace.name))) + return builder().add(Iterables.filter(this, k -> !k.name.equals(keyspace.name))) .add(keyspace) .build(); } - MapDifference<String, KeyspaceMetadata> diff(Keyspaces other) + public void validate() { - return Maps.difference(keyspaces, other.keyspaces); + keyspaces.values().forEach(KeyspaceMetadata::validate); } @Override @@ -167,4 +191,49 @@ public final class Keyspaces implements Iterable<KeyspaceMetadata> return this; } } + + static KeyspacesDiff diff(Keyspaces before, Keyspaces after) + { + return KeyspacesDiff.diff(before, after); + } + + public static final class KeyspacesDiff + { + static final KeyspacesDiff NONE = new KeyspacesDiff(Keyspaces.none(), Keyspaces.none(), ImmutableList.of()); + + public final Keyspaces created; + public final Keyspaces dropped; + public final ImmutableList<KeyspaceDiff> altered; + + private KeyspacesDiff(Keyspaces created, Keyspaces dropped, ImmutableList<KeyspaceDiff> altered) + { + this.created = created; + this.dropped = dropped; + this.altered = altered; + } + + private static KeyspacesDiff diff(Keyspaces before, Keyspaces after) + { + if (before == after) + return NONE; + + Keyspaces created = after.filter(k -> !before.containsKeyspace(k.name)); + Keyspaces dropped = before.filter(k -> !after.containsKeyspace(k.name)); + + ImmutableList.Builder<KeyspaceDiff> altered = ImmutableList.builder(); + before.forEach(keyspaceBefore -> + { + KeyspaceMetadata keyspaceAfter = after.getNullable(keyspaceBefore.name); + if (null != keyspaceAfter) + KeyspaceMetadata.diff(keyspaceBefore, keyspaceAfter).ifPresent(altered::add); + }); + + return new KeyspacesDiff(created, dropped, altered.build()); + } + + public boolean isEmpty() + { + return created.isEmpty() && dropped.isEmpty() && altered.isEmpty(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/207c80c1/src/java/org/apache/cassandra/schema/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java index c8881e5..ac95054 100644 --- a/src/java/org/apache/cassandra/schema/MigrationManager.java +++ b/src/java/org/apache/cassandra/schema/MigrationManager.java @@ -23,15 +23,14 @@ import java.util.concurrent.*; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; +import com.google.common.util.concurrent.Futures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.cql3.functions.UDAggregate; -import org.apache.cassandra.cql3.functions.UDFunction; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.exceptions.AlreadyExistsException; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.*; @@ -41,9 +40,8 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.WrappedRunnable; public class MigrationManager { @@ -150,6 +148,14 @@ public class MigrationManager && !Gossiper.instance.isGossipOnlyMember(endpoint); } + private static boolean shouldPushSchemaTo(InetAddressAndPort endpoint) + { + // only push schema to nodes with known and equal versions + return !endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) + && MessagingService.instance().knowsVersion(endpoint) + && MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version; + } + public static boolean isReadyForBootstrap() { return MigrationTask.getInflightTasks().isEmpty(); @@ -194,21 +200,16 @@ public class MigrationManager announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm, timestamp), announceLocally); } - public static void announceNewTable(TableMetadata cfm) throws ConfigurationException - { - announceNewTable(cfm, false); - } - - public static void announceNewTable(TableMetadata cfm, boolean announceLocally) + public static void announceNewTable(TableMetadata cfm) { - announceNewTable(cfm, announceLocally, true); + announceNewTable(cfm, true, FBUtilities.timestampMicros()); } /** * Announces the table even if the definition is already know locally. * This should generally be avoided but is used internally when we want to force the most up to date version of * a system table schema (Note that we don't know if the schema we force _is_ the most recent version or not, we - * just rely on idempotency to basically ignore that announce if it's not. That's why we can't use announceUpdateColumnFamily, + * just rely on idempotency to basically ignore that announce if it's not. That's why we can't use announceTableUpdate * it would for instance delete new columns if this is not called with the most up-to-date version) * * Note that this is only safe for system tables where we know the id is fixed and will be the same whatever version @@ -216,15 +217,10 @@ public class MigrationManager */ public static void forceAnnounceNewTable(TableMetadata cfm) { - announceNewTable(cfm, false, false, 0); - } - - private static void announceNewTable(TableMetadata cfm, boolean announceLocally, boolean throwOnDuplicate) - { - announceNewTable(cfm, announceLocally, throwOnDuplicate, FBUtilities.timestampMicros()); + announceNewTable(cfm, false, 0); } - private static void announceNewTable(TableMetadata cfm, boolean announceLocally, boolean throwOnDuplicate, long timestamp) + private static void announceNewTable(TableMetadata cfm, boolean throwOnDuplicate, long timestamp) { cfm.validate(); @@ -236,49 +232,10 @@ public class MigrationManager throw new AlreadyExistsException(cfm.keyspace, cfm.name); logger.info("Create new table: {}", cfm); - announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), announceLocally); - } - - public static void announceNewView(ViewMetadata view, boolean announceLocally) throws ConfigurationException - { - view.metadata.validate(); - - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(view.keyspace); - if (ksm == null) - throw new ConfigurationException(String.format("Cannot add table '%s' to non existing keyspace '%s'.", view.name, view.keyspace)); - else if (ksm.getTableOrViewNullable(view.name) != null) - throw new AlreadyExistsException(view.keyspace, view.name); - - logger.info("Create new view: {}", view); - announce(SchemaKeyspace.makeCreateViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewType(UserType newType, boolean announceLocally) - { - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(newType.keyspace); - announce(SchemaKeyspace.makeCreateTypeMutation(ksm, newType, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewFunction(UDFunction udf, boolean announceLocally) - { - logger.info("Create scalar function '{}'", udf.name()); - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); - announce(SchemaKeyspace.makeCreateFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceNewAggregate(UDAggregate udf, boolean announceLocally) - { - logger.info("Create aggregate function '{}'", udf.name()); - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); - announce(SchemaKeyspace.makeCreateAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); + announce(SchemaKeyspace.makeCreateTableMutation(ksm, cfm, timestamp), false); } - public static void announceKeyspaceUpdate(KeyspaceMetadata ksm) throws ConfigurationException - { - announceKeyspaceUpdate(ksm, false); - } - - public static void announceKeyspaceUpdate(KeyspaceMetadata ksm, boolean announceLocally) throws ConfigurationException + static void announceKeyspaceUpdate(KeyspaceMetadata ksm) { ksm.validate(); @@ -287,20 +244,15 @@ public class MigrationManager throw new ConfigurationException(String.format("Cannot update non existing keyspace '%s'.", ksm.name)); logger.info("Update Keyspace '{}' From {} To {}", ksm.name, oldKsm, ksm); - announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, FBUtilities.timestampMicros()), announceLocally); + announce(SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, FBUtilities.timestampMicros()), false); } - public static void announceTableUpdate(TableMetadata tm) throws ConfigurationException + public static void announceTableUpdate(TableMetadata tm) { announceTableUpdate(tm, false); } - public static void announceTableUpdate(TableMetadata updated, boolean announceLocally) throws ConfigurationException - { - announceTableUpdate(updated, null, announceLocally); - } - - public static void announceTableUpdate(TableMetadata updated, Collection<ViewMetadata> views, boolean announceLocally) throws ConfigurationException + public static void announceTableUpdate(TableMetadata updated, boolean announceLocally) { updated.validate(); @@ -309,69 +261,27 @@ public class MigrationManager throw new ConfigurationException(String.format("Cannot update non existing table '%s' in keyspace '%s'.", updated.name, updated.keyspace)); KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(current.keyspace); - current.validateCompatibility(updated); + updated.validateCompatibility(current); long timestamp = FBUtilities.timestampMicros(); logger.info("Update table '{}/{}' From {} To {}", current.keyspace, current.name, current, updated); Mutation.SimpleBuilder builder = SchemaKeyspace.makeUpdateTableMutation(ksm, current, updated, timestamp); - if (views != null) - views.forEach(view -> addViewUpdateToMutationBuilder(view, builder)); - - announce(builder, announceLocally); - } - - public static void announceViewUpdate(ViewMetadata view, boolean announceLocally) throws ConfigurationException - { - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(view.keyspace); - long timestamp = FBUtilities.timestampMicros(); - Mutation.SimpleBuilder builder = SchemaKeyspace.makeCreateKeyspaceMutation(ksm.name, ksm.params, timestamp); - addViewUpdateToMutationBuilder(view, builder); announce(builder, announceLocally); } - private static void addViewUpdateToMutationBuilder(ViewMetadata view, Mutation.SimpleBuilder builder) - { - view.metadata.validate(); - - ViewMetadata oldView = Schema.instance.getView(view.keyspace, view.name); - if (oldView == null) - throw new ConfigurationException(String.format("Cannot update non existing materialized view '%s' in keyspace '%s'.", view.name, view.keyspace)); - - oldView.metadata.validateCompatibility(view.metadata); - - logger.info("Update view '{}/{}' From {} To {}", view.keyspace, view.name, oldView, view); - SchemaKeyspace.makeUpdateViewMutation(builder, oldView, view); - } - - public static void announceTypeUpdate(UserType updatedType, boolean announceLocally) - { - logger.info("Update type '{}.{}' to {}", updatedType.keyspace, updatedType.getNameAsString(), updatedType); - announceNewType(updatedType, announceLocally); - } - - public static void announceKeyspaceDrop(String ksName) throws ConfigurationException - { - announceKeyspaceDrop(ksName, false); - } - - public static void announceKeyspaceDrop(String ksName, boolean announceLocally) throws ConfigurationException + static void announceKeyspaceDrop(String ksName) { KeyspaceMetadata oldKsm = Schema.instance.getKeyspaceMetadata(ksName); if (oldKsm == null) throw new ConfigurationException(String.format("Cannot drop non existing keyspace '%s'.", ksName)); logger.info("Drop Keyspace '{}'", oldKsm.name); - announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceTableDrop(String ksName, String cfName) throws ConfigurationException - { - announceTableDrop(ksName, cfName, false); + announce(SchemaKeyspace.makeDropKeyspaceMutation(oldKsm, FBUtilities.timestampMicros()), false); } - public static void announceTableDrop(String ksName, String cfName, boolean announceLocally) throws ConfigurationException + public static void announceTableDrop(String ksName, String cfName, boolean announceLocally) { TableMetadata tm = Schema.instance.getTableMetadata(ksName, cfName); if (tm == null) @@ -382,37 +292,6 @@ public class MigrationManager announce(SchemaKeyspace.makeDropTableMutation(ksm, tm, FBUtilities.timestampMicros()), announceLocally); } - public static void announceViewDrop(String ksName, String viewName, boolean announceLocally) throws ConfigurationException - { - ViewMetadata view = Schema.instance.getView(ksName, viewName); - if (view == null) - throw new ConfigurationException(String.format("Cannot drop non existing materialized view '%s' in keyspace '%s'.", viewName, ksName)); - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(ksName); - - logger.info("Drop table '{}/{}'", view.keyspace, view.name); - announce(SchemaKeyspace.makeDropViewMutation(ksm, view, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceTypeDrop(UserType droppedType, boolean announceLocally) - { - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(droppedType.keyspace); - announce(SchemaKeyspace.dropTypeFromSchemaMutation(ksm, droppedType, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceFunctionDrop(UDFunction udf, boolean announceLocally) - { - logger.info("Drop scalar function overload '{}' args '{}'", udf.name(), udf.argTypes()); - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); - announce(SchemaKeyspace.makeDropFunctionMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - - public static void announceAggregateDrop(UDAggregate udf, boolean announceLocally) - { - logger.info("Drop aggregate function overload '{}' args '{}'", udf.name(), udf.argTypes()); - KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(udf.name().keyspace); - announce(SchemaKeyspace.makeDropAggregateMutation(ksm, udf, FBUtilities.timestampMicros()), announceLocally); - } - /** * actively announce a new version to active hosts via rpc * @param schema The schema mutation to be applied @@ -424,7 +303,7 @@ public class MigrationManager if (announceLocally) Schema.instance.merge(mutations); else - FBUtilities.waitOnFuture(announce(mutations)); + announce(mutations); } private static void pushSchemaMutation(InetAddressAndPort endpoint, Collection<Mutation> schema) @@ -436,38 +315,36 @@ public class MigrationManager } // Returns a future on the local application of the schema - private static Future<?> announce(final Collection<Mutation> schema) + private static void announce(Collection<Mutation> schema) { - Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable() - { - protected void runMayThrow() throws ConfigurationException - { - Schema.instance.mergeAndAnnounceVersion(schema); - } - }); + Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.mergeAndAnnounceVersion(schema)); for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers()) - { - // only push schema to nodes with known and equal versions - if (!endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) && - MessagingService.instance().knowsVersion(endpoint) && - MessagingService.instance().getRawVersion(endpoint) == MessagingService.current_version) + if (shouldPushSchemaTo(endpoint)) pushSchemaMutation(endpoint, schema); - } - return f; + FBUtilities.waitOnFuture(f); } - /** - * Announce my version passively over gossip. - * Used to notify nodes as they arrive in the cluster. - * - * @param version The schema version to announce - */ - static void passiveAnnounce(UUID version) + public static KeyspacesDiff announce(SchemaTransformation transformation, boolean locally) { - Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version)); - logger.debug("Gossiping my schema version {}", version); + long now = FBUtilities.timestampMicros(); + + Future<Schema.TransformationResult> future = + StageManager.getStage(Stage.MIGRATION).submit(() -> Schema.instance.transform(transformation, locally, now)); + + Schema.TransformationResult result = Futures.getUnchecked(future); + if (!result.success) + throw result.exception; + + if (locally || result.diff.isEmpty()) + return result.diff; + + for (InetAddressAndPort endpoint : Gossiper.instance.getLiveMembers()) + if (shouldPushSchemaTo(endpoint)) + pushSchemaMutation(endpoint, result.mutations); + + return result.diff; } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
