http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/UDFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java index 8b42e51..4672451 100644 --- a/src/java/org/apache/cassandra/cql3/functions/UDFunction.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDFunction.java @@ -30,11 +30,7 @@ import com.datastax.driver.core.UserType; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.ByteBufferUtil; @@ -47,9 +43,10 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct protected static final Logger logger = LoggerFactory.getLogger(UDFunction.class); protected final List<ColumnIdentifier> argNames; + protected final String language; protected final String body; - private final boolean deterministic; + protected final boolean isDeterministic; protected final DataType[] argDataTypes; protected final DataType returnDataType; @@ -60,10 +57,10 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct AbstractType<?> returnType, String language, String body, - boolean deterministic) + boolean isDeterministic) { this(name, argNames, argTypes, UDHelper.driverTypes(argTypes), returnType, - UDHelper.driverType(returnType), language, body, deterministic); + UDHelper.driverType(returnType), language, body, isDeterministic); } protected UDFunction(FunctionName name, @@ -74,14 +71,14 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct DataType returnDataType, String language, String body, - boolean deterministic) + boolean isDeterministic) { super(name, argTypes, returnType); assert new HashSet<>(argNames).size() == argNames.size() : "duplicate argument names"; this.argNames = argNames; this.language = language; this.body = body; - this.deterministic = deterministic; + this.isDeterministic = isDeterministic; this.argDataTypes = argDataTypes; this.returnDataType = returnDataType; } @@ -92,13 +89,13 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct AbstractType<?> returnType, String language, String body, - boolean deterministic) + boolean isDeterministic) throws InvalidRequestException { switch (language) { - case "java": return JavaSourceUDFFactory.buildUDF(name, argNames, argTypes, returnType, body, deterministic); - default: return new ScriptBasedUDF(name, argNames, argTypes, returnType, language, body, deterministic); + case "java": return JavaSourceUDFFactory.buildUDF(name, argNames, argTypes, returnType, body, isDeterministic); + default: return new ScriptBasedUDF(name, argNames, argTypes, returnType, language, body, isDeterministic); } } @@ -111,24 +108,27 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct * 2) we return a meaningful error message if the function is executed (something more precise * than saying that the function doesn't exist) */ - private static UDFunction createBrokenFunction(FunctionName name, - List<ColumnIdentifier> argNames, - List<AbstractType<?>> argTypes, - AbstractType<?> returnType, - String language, - String body, - final InvalidRequestException reason) + public static UDFunction createBrokenFunction(FunctionName name, + List<ColumnIdentifier> argNames, + List<AbstractType<?>> argTypes, + AbstractType<?> returnType, + String language, + String body, + final InvalidRequestException reason) { return new UDFunction(name, argNames, argTypes, returnType, language, body, true) { public ByteBuffer execute(int protocolVersion, List<ByteBuffer> parameters) throws InvalidRequestException { - throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully for the following reason: %s. " - + "Please see the server log for more details", this, reason.getMessage())); + throw new InvalidRequestException(String.format("Function '%s' exists but hasn't been loaded successfully " + + "for the following reason: %s. Please see the server log for details", + this, + reason.getMessage())); } }; } + public boolean isAggregate() { return false; @@ -136,7 +136,7 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct public boolean isPure() { - return deterministic; + return isDeterministic; } public boolean isNative() @@ -144,13 +144,33 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct return false; } + public List<ColumnIdentifier> argNames() + { + return argNames; + } + + public boolean isDeterministic() + { + return isDeterministic; + } + + public String body() + { + return body; + } + + public String language() + { + return language; + } + /** * Used by UDF implementations (both Java code generated by {@link org.apache.cassandra.cql3.functions.JavaSourceUDFFactory} * and script executor {@link org.apache.cassandra.cql3.functions.ScriptBasedUDF}) to convert the C* * serialized representation to the Java object representation. * * @param protocolVersion the native protocol version used for serialization - * @param argIndex index of the UDF input argument + * @param argIndex index of the UDF input argument */ protected Object compose(int protocolVersion, int argIndex, ByteBuffer value) { @@ -169,117 +189,6 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct return value == null ? null : returnDataType.serialize(value, ProtocolVersion.fromInt(protocolVersion)); } - private static Mutation makeSchemaMutation(FunctionName name) - { - UTF8Type kv = (UTF8Type)SystemKeyspace.SchemaFunctionsTable.getKeyValidator(); - return new Mutation(SystemKeyspace.NAME, kv.decompose(name.keyspace)); - } - - public Mutation toSchemaDrop(long timestamp) - { - Mutation mutation = makeSchemaMutation(name); - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE); - - Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, UDHelper.computeSignature(argTypes)); - int ldt = (int) (System.currentTimeMillis() / 1000); - cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - - return mutation; - } - - public static Map<Composite, UDFunction> fromSchema(Row row) - { - UntypedResultSet results = QueryProcessor.resultify("SELECT * FROM system." + SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, row); - Map<Composite, UDFunction> udfs = new HashMap<>(results.size()); - for (UntypedResultSet.Row result : results) - udfs.put(SystemKeyspace.SchemaFunctionsTable.comparator.make(result.getString("function_name"), result.getBlob("signature")), - fromSchema(result)); - return udfs; - } - - public Mutation toSchemaUpdate(long timestamp) - { - Mutation mutation = makeSchemaMutation(name); - ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE); - - Composite prefix = SystemKeyspace.SchemaFunctionsTable.comparator.make(name.name, UDHelper.computeSignature(argTypes)); - CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp); - - adder.resetCollection("argument_names"); - adder.resetCollection("argument_types"); - adder.add("return_type", returnType.toString()); - adder.add("language", language); - adder.add("body", body); - adder.add("deterministic", deterministic); - - for (int i = 0; i < argNames.size(); i++) - { - adder.addListEntry("argument_names", argNames.get(i).bytes); - adder.addListEntry("argument_types", argTypes.get(i).toString()); - } - - return mutation; - } - - public static UDFunction fromSchema(UntypedResultSet.Row row) - { - String ksName = row.getString("keyspace_name"); - String functionName = row.getString("function_name"); - FunctionName name = new FunctionName(ksName, functionName); - - List<String> names = row.getList("argument_names", UTF8Type.instance); - List<String> types = row.getList("argument_types", UTF8Type.instance); - - List<ColumnIdentifier> argNames; - if (names == null) - argNames = Collections.emptyList(); - else - { - argNames = new ArrayList<>(names.size()); - for (String arg : names) - argNames.add(new ColumnIdentifier(arg, true)); - } - - List<AbstractType<?>> argTypes; - if (types == null) - argTypes = Collections.emptyList(); - else - { - argTypes = new ArrayList<>(types.size()); - for (String type : types) - argTypes.add(parseType(type)); - } - - AbstractType<?> returnType = parseType(row.getString("return_type")); - - boolean deterministic = row.getBoolean("deterministic"); - String language = row.getString("language"); - String body = row.getString("body"); - - try - { - return create(name, argNames, argTypes, returnType, language, body, deterministic); - } - catch (InvalidRequestException e) - { - logger.error(String.format("Cannot load function '%s' from schema: this function won't be available (on this node)", name), e); - return createBrokenFunction(name, argNames, argTypes, returnType, language, body, e); - } - } - - private static AbstractType<?> parseType(String str) - { - // We only use this when reading the schema where we shouldn't get an error - try - { - return TypeParser.parse(str); - } - catch (SyntaxException | ConfigurationException e) - { - throw new RuntimeException(e); - } - } - @Override public boolean equals(Object o) { @@ -287,19 +196,19 @@ public abstract class UDFunction extends AbstractFunction implements ScalarFunct return false; UDFunction that = (UDFunction)o; - return Objects.equal(this.name, that.name) - && Functions.typeEquals(this.argTypes, that.argTypes) - && Functions.typeEquals(this.returnType, that.returnType) - && Objects.equal(this.argNames, that.argNames) - && Objects.equal(this.language, that.language) - && Objects.equal(this.body, that.body) - && Objects.equal(this.deterministic, that.deterministic); + return Objects.equal(name, that.name) + && Objects.equal(argNames, that.argNames) + && Functions.typeEquals(argTypes, that.argTypes) + && Functions.typeEquals(returnType, that.returnType) + && Objects.equal(language, that.language) + && Objects.equal(body, that.body) + && Objects.equal(isDeterministic, that.isDeterministic); } @Override public int hashCode() { - return Objects.hashCode(name, argTypes, returnType, argNames, language, body, deterministic); + return Objects.hashCode(name, argNames, argTypes, returnType, language, body, isDeterministic); } public void userTypeUpdated(String ksName, String typeName)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/functions/UDHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java index 2a17c75..0738cbe 100644 --- a/src/java/org/apache/cassandra/cql3/functions/UDHelper.java +++ b/src/java/org/apache/cassandra/cql3/functions/UDHelper.java @@ -31,12 +31,13 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.DataType; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.utils.FBUtilities; /** * Helper class for User Defined Functions + Aggregates. */ -final class UDHelper +public final class UDHelper { protected static final Logger logger = LoggerFactory.getLogger(UDHelper.class); @@ -112,12 +113,13 @@ final class UDHelper // we use a "signature" which is just a SHA-1 of it's argument types (we could replace that by // using a "signature" UDT that would be comprised of the function name and argument types, // which we could then use as clustering column. But as we haven't yet used UDT in system tables, - // We'll left that decision to #6717). - protected static ByteBuffer computeSignature(List<AbstractType<?>> argTypes) + // We'll leave that decision to #6717). + public static ByteBuffer calculateSignature(AbstractFunction fun) { MessageDigest digest = FBUtilities.newMessageDigest("SHA-1"); - for (AbstractType<?> type : argTypes) - digest.update(type.asCQL3Type().toString().getBytes(StandardCharsets.UTF_8)); + digest.update(UTF8Type.instance.decompose(fun.name().name)); + for (AbstractType<?> type : fun.argTypes()) + digest.update(UTF8Type.instance.decompose(type.asCQL3Type().toString())); return ByteBuffer.wrap(digest.digest()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java index 6aea3b1..c8c2474 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java @@ -151,14 +151,32 @@ public class CreateTableStatement extends SchemaAlteringStatement .addAllColumnDefinitions(getColumns(cfmd)) .isDense(isDense); - cfmd.addColumnMetadataFromAliases(keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY); - cfmd.addColumnMetadataFromAliases(columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN); + addColumnMetadataFromAliases(cfmd, keyAliases, keyValidator, ColumnDefinition.Kind.PARTITION_KEY); + addColumnMetadataFromAliases(cfmd, columnAliases, comparator.asAbstractType(), ColumnDefinition.Kind.CLUSTERING_COLUMN); if (valueAlias != null) - cfmd.addColumnMetadataFromAliases(Collections.<ByteBuffer>singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE); + addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE); properties.applyToCFMetadata(cfmd); } + private void addColumnMetadataFromAliases(CFMetaData cfm, List<ByteBuffer> aliases, AbstractType<?> comparator, ColumnDefinition.Kind kind) + { + if (comparator instanceof CompositeType) + { + CompositeType ct = (CompositeType)comparator; + for (int i = 0; i < aliases.size(); ++i) + if (aliases.get(i) != null) + cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(i), ct.types.get(i), i, kind)); + } + else + { + assert aliases.size() <= 1; + if (!aliases.isEmpty() && aliases.get(0) != null) + cfm.addOrReplaceColumnDefinition(new ColumnDefinition(cfm, aliases.get(0), comparator, null, kind)); + } + } + + public static class RawStatement extends CFStatement { private final Map<ColumnIdentifier, CQL3Type.Raw> definitions = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java index 1fcd63c..e766f65 100644 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java @@ -35,6 +35,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; @@ -59,7 +60,7 @@ import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; */ public class AtomicBTreeColumns extends ColumnFamily { - static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(SystemKeyspace.BuiltIndexesTable, null)) + static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.denseCFMetaData("keyspace", "table", BytesType.instance), null)) + ObjectSizes.measure(new Holder(null, null)); // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index b33e457..e71a62c 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -102,7 +102,7 @@ public class BatchlogManager implements BatchlogManagerMBean public int countAllBatches() { - String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG_TABLE); + String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHLOG); return (int) executeInternal(query).one().getLong("count"); } @@ -137,8 +137,8 @@ public class BatchlogManager implements BatchlogManagerMBean @VisibleForTesting static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid, int version, long now) { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.BatchlogTable); - CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.BatchlogTable.comparator.builder().build(), now); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog); + CFRowAdder adder = new CFRowAdder(cf, SystemKeyspace.Batchlog.comparator.builder().build(), now); adder.add("data", serializeMutations(mutations, version)) .add("written_at", new Date(now / 1000)) .add("version", version); @@ -174,7 +174,7 @@ public class BatchlogManager implements BatchlogManagerMBean UntypedResultSet page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s LIMIT %d", SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG_TABLE, + SystemKeyspace.BATCHLOG, PAGE_SIZE)); while (!page.isEmpty()) @@ -186,7 +186,7 @@ public class BatchlogManager implements BatchlogManagerMBean page = executeInternal(String.format("SELECT id, data, written_at, version FROM %s.%s WHERE token(id) > token(?) LIMIT %d", SystemKeyspace.NAME, - SystemKeyspace.BATCHLOG_TABLE, + SystemKeyspace.BATCHLOG, PAGE_SIZE), id); } @@ -199,7 +199,7 @@ public class BatchlogManager implements BatchlogManagerMBean private void deleteBatch(UUID id) { Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(id)); - mutation.delete(SystemKeyspace.BATCHLOG_TABLE, FBUtilities.timestampMicros()); + mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros()); mutation.apply(); } @@ -447,7 +447,7 @@ public class BatchlogManager implements BatchlogManagerMBean // force flush + compaction to reclaim space from the replayed batches private void cleanup() throws ExecutionException, InterruptedException { - ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG_TABLE); + ColumnFamilyStore cfs = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHLOG); cfs.forceBlockingFlush(); Collection<Descriptor> descriptors = new ArrayList<>(); for (SSTableReader sstr : cfs.getSSTables()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java index 5cb62ed..d5ede03 100644 --- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java +++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java @@ -26,6 +26,7 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.utils.WrappedRunnable; /** @@ -46,7 +47,7 @@ public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Mut { public void runMayThrow() throws Exception { - DefsTables.mergeSchema(message.payload); + LegacySchemaTables.mergeSchema(message.payload); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/DefsTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java deleted file mode 100644 index 82a5dd1..0000000 --- a/src/java/org/apache/cassandra/db/DefsTables.java +++ /dev/null @@ -1,622 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.config.UTMetaData; -import org.apache.cassandra.cql3.functions.Functions; -import org.apache.cassandra.cql3.functions.UDAggregate; -import org.apache.cassandra.cql3.functions.UDFunction; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** - * SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema - * load/distribution easy, it replaces old mechanism when local migrations where serialized, stored in system.Migrations - * and used for schema distribution. - */ -public class DefsTables -{ - private static final Logger logger = LoggerFactory.getLogger(DefsTables.class); - - /** - * Load keyspace definitions for the system keyspace (system.SCHEMA_KEYSPACES_TABLE) - * - * @return Collection of found keyspace definitions - */ - public static Collection<KSMetaData> loadFromKeyspace() - { - List<Row> serializedSchema = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE); - - List<KSMetaData> keyspaces = new ArrayList<>(serializedSchema.size()); - - for (Row row : serializedSchema) - { - if (Schema.invalidSchemaRow(row) || Schema.ignoredSchemaRow(row)) - continue; - - keyspaces.add(KSMetaData.fromSchema(row, serializedColumnFamilies(row.key), serializedUserTypes(row.key))); - } - - return keyspaces; - } - - private static Row serializedColumnFamilies(DecoratedKey ksNameKey) - { - ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE); - return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, - SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, - System.currentTimeMillis()))); - } - - private static Row serializedUserTypes(DecoratedKey ksNameKey) - { - ColumnFamilyStore cfsStore = SystemKeyspace.schemaCFS(SystemKeyspace.SCHEMA_USER_TYPES_TABLE); - return new Row(ksNameKey, cfsStore.getColumnFamily(QueryFilter.getIdentityFilter(ksNameKey, - SystemKeyspace.SCHEMA_USER_TYPES_TABLE, - System.currentTimeMillis()))); - } - - /** - * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects - * (which also involves fs operations on add/drop ks/cf) - * - * @param mutations the schema changes to apply - * - * @throws ConfigurationException If one of metadata attributes has invalid value - * @throws IOException If data was corrupted during transportation or failed to apply fs operations - */ - public static synchronized void mergeSchema(Collection<Mutation> mutations) throws ConfigurationException, IOException - { - mergeSchemaInternal(mutations, true); - Schema.instance.updateVersionAndAnnounce(); - } - - public static synchronized void mergeSchemaInternal(Collection<Mutation> mutations, boolean doFlush) throws IOException - { - // compare before/after schemas of the affected keyspaces only - Set<String> keyspaces = new HashSet<>(mutations.size()); - for (Mutation mutation : mutations) - keyspaces.add(ByteBufferUtil.string(mutation.key())); - - // current state of the schema - Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> oldTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> oldFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> oldAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces); - - for (Mutation mutation : mutations) - mutation.apply(); - - if (doFlush) - flushSchemaCFs(); - - // with new data applied - Map<DecoratedKey, ColumnFamily> newKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> newColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> newTypes = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_USER_TYPES_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> newFunctions = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_FUNCTIONS_TABLE, keyspaces); - Map<DecoratedKey, ColumnFamily> newAggregates = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_AGGREGATES_TABLE, keyspaces); - - Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); - mergeColumnFamilies(oldColumnFamilies, newColumnFamilies); - mergeTypes(oldTypes, newTypes); - mergeFunctions(oldFunctions, newFunctions); - mergeAggregates(oldAggregates, newAggregates); - - // it is safe to drop a keyspace only when all nested ColumnFamilies where deleted - for (String keyspaceToDrop : keyspacesToDrop) - dropKeyspace(keyspaceToDrop); - } - - private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) - { - List<Row> created = new ArrayList<>(); - List<String> altered = new ArrayList<>(); - Set<String> dropped = new HashSet<>(); - - /* - * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us - * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily - * there that only has the top-level deletion, if: - * a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place - * b) a pulled dropped keyspace that got dropped before it could find a way to this node - * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns: - * that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way - * to this node - */ - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.add(new Row(entry.getKey(), entry.getValue())); - - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) - { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - altered.add(keyspaceName); - else if (pre.hasColumns()) - dropped.add(keyspaceName); - else if (post.hasColumns()) // a (re)created keyspace - created.add(new Row(entry.getKey(), post)); - } - - for (Row row : created) - addKeyspace(KSMetaData.fromSchema(row, Collections.<CFMetaData>emptyList(), new UTMetaData())); - for (String name : altered) - updateKeyspace(name); - return dropped; - } - - // see the comments for mergeKeyspaces() - private static void mergeColumnFamilies(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) - { - List<CFMetaData> created = new ArrayList<>(); - List<CFMetaData> altered = new ArrayList<>(); - List<CFMetaData> dropped = new ArrayList<>(); - - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), entry.getValue())).values()); - - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) - { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - { - MapDifference<String, CFMetaData> delta = - Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(), - KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>() - { - public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair) - { - return pair.rightValue(); - } - })); - } - else if (pre.hasColumns()) - { - dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values()); - } - else if (post.hasColumns()) - { - created.addAll(KSMetaData.deserializeColumnFamilies(new Row(entry.getKey(), post)).values()); - } - } - - for (CFMetaData cfm : created) - addColumnFamily(cfm); - for (CFMetaData cfm : altered) - updateColumnFamily(cfm.ksName, cfm.cfName); - for (CFMetaData cfm : dropped) - dropColumnFamily(cfm.ksName, cfm.cfName); - } - - // see the comments for mergeKeyspaces() - private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) - { - List<UserType> created = new ArrayList<>(); - List<UserType> altered = new ArrayList<>(); - List<UserType> dropped = new ArrayList<>(); - - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - // New keyspace with types - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(UTMetaData.fromSchema(new Row(entry.getKey(), entry.getValue())).values()); - - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) - { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - { - MapDifference<ByteBuffer, UserType> delta = - Maps.difference(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes(), - UTMetaData.fromSchema(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UserType>, UserType>() - { - public UserType apply(MapDifference.ValueDifference<UserType> pair) - { - return pair.rightValue(); - } - })); - } - else if (pre.hasColumns()) - { - dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes().values()); - } - else if (post.hasColumns()) - { - created.addAll(UTMetaData.fromSchema(new Row(entry.getKey(), post)).values()); - } - } - - for (UserType type : created) - addType(type); - for (UserType type : altered) - updateType(type); - for (UserType type : dropped) - dropType(type); - } - - // see the comments for mergeKeyspaces() - private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) - { - List<UDFunction> created = new ArrayList<>(); - List<UDFunction> altered = new ArrayList<>(); - List<UDFunction> dropped = new ArrayList<>(); - - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - // New keyspace with functions - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(UDFunction.fromSchema(new Row(entry.getKey(), entry.getValue())).values()); - - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) - { - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - { - MapDifference<Composite, UDFunction> delta = - Maps.difference(UDFunction.fromSchema(new Row(entry.getKey(), pre)), - UDFunction.fromSchema(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDFunction>, UDFunction>() - { - public UDFunction apply(MapDifference.ValueDifference<UDFunction> pair) - { - return pair.rightValue(); - } - })); - } - else if (pre.hasColumns()) - { - dropped.addAll(UDFunction.fromSchema(new Row(entry.getKey(), pre)).values()); - } - else if (post.hasColumns()) - { - created.addAll(UDFunction.fromSchema(new Row(entry.getKey(), post)).values()); - } - } - - for (UDFunction udf : created) - addFunction(udf); - for (UDFunction udf : altered) - updateFunction(udf); - for (UDFunction udf : dropped) - dropFunction(udf); - } - - // see the comments for mergeKeyspaces() - private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) - { - List<UDAggregate> created = new ArrayList<>(); - List<UDAggregate> altered = new ArrayList<>(); - List<UDAggregate> dropped = new ArrayList<>(); - - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - // New keyspace with functions - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), entry.getValue())).values()); - - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) - { - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - { - MapDifference<Composite, UDAggregate> delta = - Maps.difference(UDAggregate.fromSchema(new Row(entry.getKey(), pre)), - UDAggregate.fromSchema(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>() - { - public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair) - { - return pair.rightValue(); - } - })); - } - else if (pre.hasColumns()) - { - dropped.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), pre)).values()); - } - else if (post.hasColumns()) - { - created.addAll(UDAggregate.fromSchema(new Row(entry.getKey(), post)).values()); - } - } - - for (UDAggregate udf : created) - addAggregate(udf); - for (UDAggregate udf : altered) - updateAggregate(udf); - for (UDAggregate udf : dropped) - dropAggregate(udf); - } - - private static void addKeyspace(KSMetaData ksm) - { - assert Schema.instance.getKSMetaData(ksm.name) == null; - Schema.instance.load(ksm); - - Keyspace.open(ksm.name); - MigrationManager.instance.notifyCreateKeyspace(ksm); - } - - private static void addColumnFamily(CFMetaData cfm) - { - assert Schema.instance.getCFMetaData(cfm.ksName, cfm.cfName) == null; - KSMetaData ksm = Schema.instance.getKSMetaData(cfm.ksName); - ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(cfm))); - - logger.info("Loading {}", cfm); - - Schema.instance.load(cfm); - - // make sure it's init-ed w/ the old definitions first, - // since we're going to call initCf on the new one manually - Keyspace.open(cfm.ksName); - - Schema.instance.setKeyspaceDefinition(ksm); - Keyspace.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true); - MigrationManager.instance.notifyCreateColumnFamily(cfm); - } - - private static void addType(UserType ut) - { - KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace); - assert ksm != null; - - logger.info("Loading {}", ut); - - ksm.userTypes.addType(ut); - - MigrationManager.instance.notifyCreateUserType(ut); - } - - private static void addFunction(UDFunction udf) - { - logger.info("Loading {}", udf); - - Functions.addFunction(udf); - - MigrationManager.instance.notifyCreateFunction(udf); - } - - private static void addAggregate(UDAggregate udf) - { - logger.info("Loading {}", udf); - - Functions.addFunction(udf); - - MigrationManager.instance.notifyCreateAggregate(udf); - } - - private static void updateKeyspace(String ksName) - { - KSMetaData oldKsm = Schema.instance.getKSMetaData(ksName); - assert oldKsm != null; - KSMetaData newKsm = KSMetaData.cloneWith(oldKsm.reloadAttributes(), oldKsm.cfMetaData().values()); - - Schema.instance.setKeyspaceDefinition(newKsm); - - Keyspace.open(ksName).createReplicationStrategy(newKsm); - MigrationManager.instance.notifyUpdateKeyspace(newKsm); - } - - private static void updateColumnFamily(String ksName, String cfName) - { - CFMetaData cfm = Schema.instance.getCFMetaData(ksName, cfName); - assert cfm != null; - cfm.reload(); - - Keyspace keyspace = Keyspace.open(cfm.ksName); - keyspace.getColumnFamilyStore(cfm.cfName).reload(); - MigrationManager.instance.notifyUpdateColumnFamily(cfm); - } - - private static void updateType(UserType ut) - { - KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace); - assert ksm != null; - - logger.info("Updating {}", ut); - - ksm.userTypes.addType(ut); - - MigrationManager.instance.notifyUpdateUserType(ut); - } - - private static void updateFunction(UDFunction udf) - { - logger.info("Updating {}", udf); - - Functions.replaceFunction(udf); - - MigrationManager.instance.notifyUpdateFunction(udf); - } - - private static void updateAggregate(UDAggregate udf) - { - logger.info("Updating {}", udf); - - Functions.replaceFunction(udf); - - MigrationManager.instance.notifyUpdateAggregate(udf); - } - - private static void dropKeyspace(String ksName) - { - KSMetaData ksm = Schema.instance.getKSMetaData(ksName); - String snapshotName = Keyspace.getTimestampedSnapshotName(ksName); - - CompactionManager.instance.interruptCompactionFor(ksm.cfMetaData().values(), true); - - Keyspace keyspace = Keyspace.open(ksm.name); - - // remove all cfs from the keyspace instance. - List<UUID> droppedCfs = new ArrayList<>(); - for (CFMetaData cfm : ksm.cfMetaData().values()) - { - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfm.cfName); - - Schema.instance.purge(cfm); - - if (DatabaseDescriptor.isAutoSnapshot()) - cfs.snapshot(snapshotName); - Keyspace.open(ksm.name).dropCf(cfm.cfId); - - droppedCfs.add(cfm.cfId); - } - - // remove the keyspace from the static instances. - Keyspace.clear(ksm.name); - Schema.instance.clearKeyspaceDefinition(ksm); - - keyspace.writeOrder.awaitNewBarrier(); - - // force a new segment in the CL - CommitLog.instance.forceRecycleAllSegments(droppedCfs); - - MigrationManager.instance.notifyDropKeyspace(ksm); - } - - private static void dropColumnFamily(String ksName, String cfName) - { - KSMetaData ksm = Schema.instance.getKSMetaData(ksName); - assert ksm != null; - ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName); - assert cfs != null; - - // reinitialize the keyspace. - CFMetaData cfm = ksm.cfMetaData().get(cfName); - - Schema.instance.purge(cfm); - Schema.instance.setKeyspaceDefinition(makeNewKeyspaceDefinition(ksm, cfm)); - - CompactionManager.instance.interruptCompactionFor(Arrays.asList(cfm), true); - - if (DatabaseDescriptor.isAutoSnapshot()) - cfs.snapshot(Keyspace.getTimestampedSnapshotName(cfs.name)); - Keyspace.open(ksm.name).dropCf(cfm.cfId); - MigrationManager.instance.notifyDropColumnFamily(cfm); - - CommitLog.instance.forceRecycleAllSegments(Collections.singleton(cfm.cfId)); - } - - private static void dropType(UserType ut) - { - KSMetaData ksm = Schema.instance.getKSMetaData(ut.keyspace); - assert ksm != null; - - ksm.userTypes.removeType(ut); - - MigrationManager.instance.notifyDropUserType(ut); - } - - private static void dropFunction(UDFunction udf) - { - logger.info("Drop {}", udf); - - // TODO: this is kind of broken as this remove all overloads of the function name - Functions.removeFunction(udf.name(), udf.argTypes()); - - MigrationManager.instance.notifyDropFunction(udf); - } - - private static void dropAggregate(UDAggregate udf) - { - logger.info("Drop {}", udf); - - // TODO: this is kind of broken as this remove all overloads of the function name - Functions.removeFunction(udf.name(), udf.argTypes()); - - MigrationManager.instance.notifyDropAggregate(udf); - } - - private static KSMetaData makeNewKeyspaceDefinition(KSMetaData ksm, CFMetaData toExclude) - { - // clone ksm but do not include the new def - List<CFMetaData> newCfs = new ArrayList<>(ksm.cfMetaData().values()); - newCfs.remove(toExclude); - assert newCfs.size() == ksm.cfMetaData().size() - 1; - return KSMetaData.cloneWith(ksm, newCfs); - } - - private static void flushSchemaCFs() - { - for (String cf : SystemKeyspace.ALL_SCHEMA_TABLES) - SystemKeyspace.forceBlockingFlush(cf); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 081e01b..8c4477b 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -115,7 +115,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY), "internal"); - private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS_TABLE); + private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); /** * Returns a mutation representing a Hint to be sent to <code>targetId</code> @@ -134,9 +134,9 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hintId = UUIDGen.getTimeUUID(); // serialize the hint with id and version as a composite column name - CellName name = SystemKeyspace.HintsTable.comparator.makeCellName(hintId, MessagingService.current_version); + CellName name = SystemKeyspace.Hints.comparator.makeCellName(hintId, MessagingService.current_version); ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version)); - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS_TABLE)); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(SystemKeyspace.NAME, SystemKeyspace.HINTS)); cf.addColumn(name, value, now, ttl); return new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(targetId), cf); } @@ -182,7 +182,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp) { Mutation mutation = new Mutation(SystemKeyspace.NAME, tokenBytes); - mutation.delete(SystemKeyspace.HINTS_TABLE, columnName, timestamp); + mutation.delete(SystemKeyspace.HINTS, columnName, timestamp); mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery } @@ -207,7 +207,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint); ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId)); final Mutation mutation = new Mutation(SystemKeyspace.NAME, hostIdBytes); - mutation.delete(SystemKeyspace.HINTS_TABLE, System.currentTimeMillis()); + mutation.delete(SystemKeyspace.HINTS, System.currentTimeMillis()); // execute asynchronously to avoid blocking caller (which may be processing gossip) Runnable runnable = new Runnable() @@ -241,7 +241,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean try { logger.info("Truncating all stored hints."); - Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS_TABLE).truncateBlocking(); + Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS).truncateBlocking(); } catch (Exception e) { @@ -375,7 +375,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean { long now = System.currentTimeMillis(); QueryFilter filter = QueryFilter.getSliceFilter(epkey, - SystemKeyspace.HINTS_TABLE, + SystemKeyspace.HINTS, startColumn, Composites.EMPTY, false, @@ -601,7 +601,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean try { RangeSliceCommand cmd = new RangeSliceCommand(SystemKeyspace.NAME, - SystemKeyspace.HINTS_TABLE, + SystemKeyspace.HINTS, System.currentTimeMillis(), predicate, range, http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 09fc338..b34d589 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -32,7 +32,6 @@ import java.util.concurrent.Future; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +44,7 @@ import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.pager.QueryPagers; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 4cf441e..2381f26 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -357,7 +357,7 @@ public class Memtable // and BL data is strictly local, so we don't need to preserve tombstones for repair. // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it. // See CASSANDRA-4667. - if (cfs.name.equals(SystemKeyspace.BATCHLOG_TABLE) && cfs.keyspace.getName().equals(SystemKeyspace.NAME)) + if (cfs.name.equals(SystemKeyspace.BATCHLOG) && cfs.keyspace.getName().equals(SystemKeyspace.NAME)) continue; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java index d4503ba..79753c1 100644 --- a/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java +++ b/src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java @@ -26,6 +26,7 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.service.MigrationManager; /** @@ -40,7 +41,7 @@ public class MigrationRequestVerbHandler implements IVerbHandler { logger.debug("Received migration request from {}.", message.from); MessageOut<Collection<Mutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE, - SystemKeyspace.serializeSchema(), + LegacySchemaTables.convertSchemaToMutations(), MigrationManager.MigrationsSerializer.instance); MessagingService.instance().sendReply(response, id, message.from); }
