http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 4db1cfb..87d186c 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -31,7 +31,8 @@ import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.compaction.AbstractCompactedRow; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.tracing.Tracing; @@ -121,18 +122,18 @@ public class Validator implements Runnable * * @param row Row to add hash */ - public void add(AbstractCompactedRow row) + public void add(UnfilteredRowIterator partition) { - assert desc.range.contains(row.key.getToken()) : row.key.getToken() + " is not contained in " + desc.range; - assert lastKey == null || lastKey.compareTo(row.key) < 0 - : "row " + row.key + " received out of order wrt " + lastKey; - lastKey = row.key; + assert desc.range.contains(partition.partitionKey().getToken()) : partition.partitionKey().getToken() + " is not contained in " + desc.range; + assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0 + : "partition " + partition.partitionKey() + " received out of order wrt " + lastKey; + lastKey = partition.partitionKey(); if (range == null) range = ranges.next(); // generate new ranges as long as case 1 is true - while (!range.contains(row.key.getToken())) + while (!range.contains(lastKey.getToken())) { // add the empty hash, and move to the next range range.ensureHashInitialised(); @@ -140,7 +141,7 @@ public class Validator implements Runnable } // case 3 must be true: mix in the hashed row - RowHash rowHash = rowHash(row); + RowHash rowHash = rowHash(partition); if (rowHash != null) { range.addHash(rowHash); @@ -186,21 +187,16 @@ public class Validator implements Runnable } - private MerkleTree.RowHash rowHash(AbstractCompactedRow row) + private MerkleTree.RowHash rowHash(UnfilteredRowIterator partition) { validated++; // MerkleTree uses XOR internally, so we want lots of output bits here CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256")); - row.update(digest); + UnfilteredRowIterators.digest(partition, digest); // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979 - if (digest.count > 0) - { - return new MerkleTree.RowHash(row.key.getToken(), digest.digest(), digest.count); - } - else - { - return null; - } + return digest.count > 0 + ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count) + : null; } /**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/schema/LegacySchemaTables.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java index b8f6421..1348d12 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaTables.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaTables.java @@ -23,9 +23,8 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.function.Function; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import org.slf4j.Logger; @@ -34,17 +33,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.*; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.cql3.functions.AbstractFunction; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.cql3.functions.UDAggregate; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.CellNames; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.compress.CompressionParameters; @@ -52,6 +48,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; import static org.apache.cassandra.utils.FBUtilities.fromJsonMap; @@ -100,6 +97,7 @@ public class LegacySchemaTables + "default_time_to_live int," + "default_validator text," + "dropped_columns map<text, bigint>," + + "dropped_columns_types map<text, text>," + "gc_grace_seconds int," + "is_dense boolean," + "key_validator text," @@ -207,29 +205,37 @@ public class LegacySchemaTables public static Collection<KSMetaData> readSchemaFromSystemTables() { - List<Row> serializedSchema = getSchemaPartitionsForTable(KEYSPACES); + ReadCommand cmd = getReadCommandForTableSchema(KEYSPACES); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup)) + { + List<KSMetaData> keyspaces = new ArrayList<>(); - List<KSMetaData> keyspaces = new ArrayList<>(serializedSchema.size()); + while (schema.hasNext()) + { + try (RowIterator partition = schema.next()) + { + if (isSystemKeyspaceSchemaPartition(partition.partitionKey())) + continue; - for (Row partition : serializedSchema) - { - if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition)) - continue; + DecoratedKey key = partition.partitionKey(); - keyspaces.add(createKeyspaceFromSchemaPartitions(partition, - readSchemaPartitionForKeyspace(COLUMNFAMILIES, partition.key), - readSchemaPartitionForKeyspace(USERTYPES, partition.key))); + readSchemaPartitionForKeyspaceAndApply(USERTYPES, key, + types -> readSchemaPartitionForKeyspaceAndApply(COLUMNFAMILIES, key, tables -> keyspaces.add(createKeyspaceFromSchemaPartitions(partition, tables, types))) + ); - // Will be moved away in #6717 - for (UDFunction function : createFunctionsFromFunctionsPartition(readSchemaPartitionForKeyspace(FUNCTIONS, partition.key)).values()) - org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(function); + // Will be moved away in #6717 + readSchemaPartitionForKeyspaceAndApply(FUNCTIONS, key, + functions -> { createFunctionsFromFunctionsPartition(functions).forEach(function -> org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(function)); return null; } + ); - // Will be moved away in #6717 - for (UDAggregate aggregate : createAggregatesFromAggregatesPartition(readSchemaPartitionForKeyspace(AGGREGATES, partition.key)).values()) - org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(aggregate); + // Will be moved away in #6717 + readSchemaPartitionForKeyspaceAndApply(AGGREGATES, key, + aggregates -> { createAggregatesFromAggregatesPartition(aggregates).forEach(aggregate -> org.apache.cassandra.cql3.functions.Functions.addOrReplaceFunction(aggregate)); return null; } + ); + } + } + return keyspaces; } - - return keyspaces; } public static void truncateSchemaTables() @@ -262,18 +268,19 @@ public class LegacySchemaTables for (String table : ALL) { - for (Row partition : getSchemaPartitionsForTable(table)) + ReadCommand cmd = getReadCommandForTableSchema(table); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); PartitionIterator schema = cmd.executeInternal(orderGroup)) { - if (isEmptySchemaPartition(partition) || isSystemKeyspaceSchemaPartition(partition)) - continue; - - // we want to digest only live columns - ColumnFamilyStore.removeDeletedColumnsOnly(partition.cf, Integer.MAX_VALUE, SecondaryIndexManager.nullUpdater); - partition.cf.purgeTombstones(Integer.MAX_VALUE); - partition.cf.updateDigest(digest); + while (schema.hasNext()) + { + try (RowIterator partition = schema.next()) + { + if (!isSystemKeyspaceSchemaPartition(partition.partitionKey())) + RowIterators.digest(partition, digest); + } + } } } - return UUID.nameUUIDFromBytes(digest.digest()); } @@ -290,14 +297,10 @@ public class LegacySchemaTables * @param schemaTableName The name of the table responsible for part of the schema. * @return low-level schema representation */ - private static List<Row> getSchemaPartitionsForTable(String schemaTableName) + private static ReadCommand getReadCommandForTableSchema(String schemaTableName) { - Token minToken = StorageService.getPartitioner().getMinimumToken(); - return getSchemaCFS(schemaTableName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()), - null, - new IdentityQueryFilter(), - Integer.MAX_VALUE, - System.currentTimeMillis()); + ColumnFamilyStore cfs = getSchemaCFS(schemaTableName); + return PartitionRangeReadCommand.allDataRead(cfs.metadata, FBUtilities.nowInSeconds()); } public static Collection<Mutation> convertSchemaToMutations() @@ -312,31 +315,45 @@ public class LegacySchemaTables private static void convertSchemaToMutations(Map<DecoratedKey, Mutation> mutationMap, String schemaTableName) { - for (Row partition : getSchemaPartitionsForTable(schemaTableName)) + ReadCommand cmd = getReadCommandForTableSchema(schemaTableName); + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) { - if (isSystemKeyspaceSchemaPartition(partition)) - continue; - - Mutation mutation = mutationMap.get(partition.key); - if (mutation == null) + while (iter.hasNext()) { - mutation = new Mutation(SystemKeyspace.NAME, partition.key.getKey()); - mutationMap.put(partition.key, mutation); - } + try (UnfilteredRowIterator partition = iter.next()) + { + if (isSystemKeyspaceSchemaPartition(partition.partitionKey())) + continue; + + DecoratedKey key = partition.partitionKey(); + Mutation mutation = mutationMap.get(key); + if (mutation == null) + { + mutation = new Mutation(SystemKeyspace.NAME, key); + mutationMap.put(key, mutation); + } - mutation.add(partition.cf); + mutation.add(UnfilteredRowIterators.toUpdate(partition)); + } + } } } - private static Map<DecoratedKey, ColumnFamily> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames) + private static Map<DecoratedKey, FilteredPartition> readSchemaForKeyspaces(String schemaTableName, Set<String> keyspaceNames) { - Map<DecoratedKey, ColumnFamily> schema = new HashMap<>(); + Map<DecoratedKey, FilteredPartition> schema = new HashMap<>(); for (String keyspaceName : keyspaceNames) { - Row schemaEntity = readSchemaPartitionForKeyspace(schemaTableName, keyspaceName); - if (schemaEntity.cf != null) - schema.put(schemaEntity.key, schemaEntity.cf); + // We don't to return the RowIterator directly because we should guarantee that this iterator + // will be closed, and putting it in a Map make that harder/more awkward. + readSchemaPartitionForKeyspaceAndApply(schemaTableName, keyspaceName, + partition -> { + if (!partition.isEmpty()) + schema.put(partition.partitionKey(), FilteredPartition.create(partition)); + return null; + } + ); } return schema; @@ -347,35 +364,46 @@ public class LegacySchemaTables return AsciiType.instance.fromString(ksName); } - private static Row readSchemaPartitionForKeyspace(String schemaTableName, String keyspaceName) + private static DecoratedKey getSchemaKSDecoratedKey(String ksName) { - DecoratedKey keyspaceKey = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName)); - return readSchemaPartitionForKeyspace(schemaTableName, keyspaceKey); + return StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName)); } - private static Row readSchemaPartitionForKeyspace(String schemaTableName, DecoratedKey keyspaceKey) + private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, String keyspaceName, Function<RowIterator, T> fct) { - QueryFilter filter = QueryFilter.getIdentityFilter(keyspaceKey, schemaTableName, System.currentTimeMillis()); - return new Row(keyspaceKey, getSchemaCFS(schemaTableName).getColumnFamily(filter)); + return readSchemaPartitionForKeyspaceAndApply(schemaTableName, getSchemaKSDecoratedKey(keyspaceName), fct); } - private static Row readSchemaPartitionForTable(String schemaTableName, String keyspaceName, String tableName) + private static <T> T readSchemaPartitionForKeyspaceAndApply(String schemaTableName, DecoratedKey keyspaceKey, Function<RowIterator, T> fct) { - DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(keyspaceName)); ColumnFamilyStore store = getSchemaCFS(schemaTableName); - Composite prefix = store.getComparator().make(tableName); - ColumnFamily cells = store.getColumnFamily(key, prefix, prefix.end(), false, Integer.MAX_VALUE, System.currentTimeMillis()); - return new Row(key, cells); + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = store.readOrdering.start(); + RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(store.metadata, nowInSec, keyspaceKey) + .queryMemtableAndDisk(store, op), nowInSec)) + { + return fct.apply(partition); + } } - private static boolean isEmptySchemaPartition(Row partition) + private static <T> T readSchemaPartitionForTableAndApply(String schemaTableName, String keyspaceName, String tableName, Function<RowIterator, T> fct) { - return partition.cf == null || (partition.cf.isMarkedForDelete() && !partition.cf.hasColumns()); + ColumnFamilyStore store = getSchemaCFS(schemaTableName); + + ClusteringComparator comparator = store.metadata.comparator; + Slices slices = Slices.with(comparator, Slice.make(comparator, tableName)); + int nowInSec = FBUtilities.nowInSeconds(); + try (OpOrder.Group op = store.readOrdering.start(); + RowIterator partition = UnfilteredRowIterators.filter(SinglePartitionSliceCommand.create(store.metadata, nowInSec, getSchemaKSDecoratedKey(keyspaceName), slices) + .queryMemtableAndDisk(store, op), nowInSec)) + { + return fct.apply(partition); + } } - private static boolean isSystemKeyspaceSchemaPartition(Row partition) + private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey) { - return getSchemaKSKey(SystemKeyspace.NAME).equals(partition.key.getKey()); + return getSchemaKSKey(SystemKeyspace.NAME).equals(partitionKey.getKey()); } /** @@ -398,14 +426,14 @@ public class LegacySchemaTables // compare before/after schemas of the affected keyspaces only Set<String> keyspaces = new HashSet<>(mutations.size()); for (Mutation mutation : mutations) - keyspaces.add(ByteBufferUtil.string(mutation.key())); + keyspaces.add(ByteBufferUtil.string(mutation.key().getKey())); // current state of the schema - Map<DecoratedKey, ColumnFamily> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); - Map<DecoratedKey, ColumnFamily> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); - Map<DecoratedKey, ColumnFamily> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); - Map<DecoratedKey, ColumnFamily> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); - Map<DecoratedKey, ColumnFamily> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); + Map<DecoratedKey, FilteredPartition> oldKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); + Map<DecoratedKey, FilteredPartition> oldColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); + Map<DecoratedKey, FilteredPartition> oldTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); + Map<DecoratedKey, FilteredPartition> oldFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); + Map<DecoratedKey, FilteredPartition> oldAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); for (Mutation mutation : mutations) mutation.apply(); @@ -414,11 +442,11 @@ public class LegacySchemaTables flushSchemaTables(); // with new data applied - Map<DecoratedKey, ColumnFamily> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); - Map<DecoratedKey, ColumnFamily> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); - Map<DecoratedKey, ColumnFamily> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); - Map<DecoratedKey, ColumnFamily> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); - Map<DecoratedKey, ColumnFamily> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); + Map<DecoratedKey, FilteredPartition> newKeyspaces = readSchemaForKeyspaces(KEYSPACES, keyspaces); + Map<DecoratedKey, FilteredPartition> newColumnFamilies = readSchemaForKeyspaces(COLUMNFAMILIES, keyspaces); + Map<DecoratedKey, FilteredPartition> newTypes = readSchemaForKeyspaces(USERTYPES, keyspaces); + Map<DecoratedKey, FilteredPartition> newFunctions = readSchemaForKeyspaces(FUNCTIONS, keyspaces); + Map<DecoratedKey, FilteredPartition> newAggregates = readSchemaForKeyspaces(AGGREGATES, keyspaces); Set<String> keyspacesToDrop = mergeKeyspaces(oldKeyspaces, newKeyspaces); mergeTables(oldColumnFamilies, newColumnFamilies); @@ -431,263 +459,187 @@ public class LegacySchemaTables Schema.instance.dropKeyspace(keyspaceToDrop); } - private static Set<String> mergeKeyspaces(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + private static Set<String> mergeKeyspaces(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) { - List<Row> created = new ArrayList<>(); - List<String> altered = new ArrayList<>(); - Set<String> dropped = new HashSet<>(); - - /* - * - we don't care about entriesOnlyOnLeft() or entriesInCommon(), because only the changes are of interest to us - * - of all entriesOnlyOnRight(), we only care about ones that have live columns; it's possible to have a ColumnFamily - * there that only has the top-level deletion, if: - * a) a pushed DROP KEYSPACE change for a keyspace hadn't ever made it to this node in the first place - * b) a pulled dropped keyspace that got dropped before it could find a way to this node - * - of entriesDiffering(), we don't care about the scenario where both pre and post-values have zero live columns: - * that means that a keyspace had been recreated and dropped, and the recreated keyspace had never found a way - * to this node - */ - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.add(new Row(entry.getKey(), entry.getValue())); - - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + for (FilteredPartition newPartition : after.values()) { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) - altered.add(keyspaceName); - else if (pre.hasColumns()) - dropped.add(keyspaceName); - else if (post.hasColumns()) // a (re)created keyspace - created.add(new Row(entry.getKey(), post)); + FilteredPartition oldPartition = before.remove(newPartition.partitionKey()); + if (oldPartition == null || oldPartition.isEmpty()) + { + Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(newPartition.rowIterator())); + } + else + { + String name = AsciiType.instance.compose(newPartition.partitionKey().getKey()); + Schema.instance.updateKeyspace(name); + } } - for (Row row : created) - Schema.instance.addKeyspace(createKeyspaceFromSchemaPartition(row)); - for (String name : altered) - Schema.instance.updateKeyspace(name); - return dropped; + // What's remain in old is those keyspace that are not in updated, i.e. the dropped ones. + return asKeyspaceNamesSet(before.keySet()); } - // see the comments for mergeKeyspaces() - private static void mergeTables(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + private static Set<String> asKeyspaceNamesSet(Set<DecoratedKey> keys) { - List<CFMetaData> created = new ArrayList<>(); - List<CFMetaData> altered = new ArrayList<>(); - List<CFMetaData> dropped = new ArrayList<>(); - - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), entry.getValue())).values()); + Set<String> names = new HashSet(keys.size()); + for (DecoratedKey key : keys) + names.add(AsciiType.instance.compose(key.getKey())); + return names; + } - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + private static void mergeTables(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) + { + diffSchema(before, after, new Differ() { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) + public void onDropped(UntypedResultSet.Row oldRow) { - MapDifference<String, CFMetaData> delta = - Maps.difference(Schema.instance.getKSMetaData(keyspaceName).cfMetaData(), - createTablesFromTablesPartition(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<CFMetaData>, CFMetaData>() - { - public CFMetaData apply(MapDifference.ValueDifference<CFMetaData> pair) - { - return pair.rightValue(); - } - })); + Schema.instance.dropTable(oldRow.getString("keyspace_name"), oldRow.getString("columnfamily_name")); } - else if (pre.hasColumns()) + + public void onAdded(UntypedResultSet.Row newRow) { - dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).cfMetaData().values()); + Schema.instance.addTable(createTableFromTableRow(newRow)); } - else if (post.hasColumns()) + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) { - created.addAll(createTablesFromTablesPartition(new Row(entry.getKey(), post)).values()); + Schema.instance.updateTable(newRow.getString("keyspace_name"), newRow.getString("columnfamily_name")); } - } - - for (CFMetaData cfm : created) - Schema.instance.addTable(cfm); - for (CFMetaData cfm : altered) - Schema.instance.updateTable(cfm.ksName, cfm.cfName); - for (CFMetaData cfm : dropped) - Schema.instance.dropTable(cfm.ksName, cfm.cfName); + }); } - // see the comments for mergeKeyspaces() - private static void mergeTypes(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + private static void mergeTypes(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) { - List<UserType> created = new ArrayList<>(); - List<UserType> altered = new ArrayList<>(); - List<UserType> dropped = new ArrayList<>(); - - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); - - // New keyspace with types - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(createTypesFromPartition(new Row(entry.getKey(), entry.getValue())).values()); - - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + diffSchema(before, after, new Differ() { - String keyspaceName = AsciiType.instance.compose(entry.getKey().getKey()); - - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) + public void onDropped(UntypedResultSet.Row oldRow) { - MapDifference<ByteBuffer, UserType> delta = - Maps.difference(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes(), - createTypesFromPartition(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UserType>, UserType>() - { - public UserType apply(MapDifference.ValueDifference<UserType> pair) - { - return pair.rightValue(); - } - })); + Schema.instance.dropType(createTypeFromRow(oldRow)); } - else if (pre.hasColumns()) + + public void onAdded(UntypedResultSet.Row newRow) { - dropped.addAll(Schema.instance.getKSMetaData(keyspaceName).userTypes.getAllTypes().values()); + Schema.instance.addType(createTypeFromRow(newRow)); } - else if (post.hasColumns()) + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) { - created.addAll(createTypesFromPartition(new Row(entry.getKey(), post)).values()); + Schema.instance.updateType(createTypeFromRow(newRow)); } - } - - for (UserType type : created) - Schema.instance.addType(type); - for (UserType type : altered) - Schema.instance.updateType(type); - for (UserType type : dropped) - Schema.instance.dropType(type); + }); } - // see the comments for mergeKeyspaces() - private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + private static void mergeFunctions(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) { - List<UDFunction> created = new ArrayList<>(); - List<UDFunction> altered = new ArrayList<>(); - List<UDFunction> dropped = new ArrayList<>(); + diffSchema(before, after, new Differ() + { + public void onDropped(UntypedResultSet.Row oldRow) + { + Schema.instance.dropFunction(createFunctionFromFunctionRow(oldRow)); + } - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); + public void onAdded(UntypedResultSet.Row newRow) + { + Schema.instance.addFunction(createFunctionFromFunctionRow(newRow)); + } - // New keyspace with functions - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), entry.getValue())).values()); + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) + { + Schema.instance.updateFunction(createFunctionFromFunctionRow(newRow)); + } + }); + } - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) + private static void mergeAggregates(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after) + { + diffSchema(before, after, new Differ() { - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); - - if (pre.hasColumns() && post.hasColumns()) + public void onDropped(UntypedResultSet.Row oldRow) { - MapDifference<ByteBuffer, UDFunction> delta = - Maps.difference(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)), - createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post))); - - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDFunction>, UDFunction>() - { - public UDFunction apply(MapDifference.ValueDifference<UDFunction> pair) - { - return pair.rightValue(); - } - })); + Schema.instance.dropAggregate(createAggregateFromAggregateRow(oldRow)); } - else if (pre.hasColumns()) + + public void onAdded(UntypedResultSet.Row newRow) { - dropped.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), pre)).values()); + Schema.instance.addAggregate(createAggregateFromAggregateRow(newRow)); } - else if (post.hasColumns()) + + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow) { - created.addAll(createFunctionsFromFunctionsPartition(new Row(entry.getKey(), post)).values()); + Schema.instance.updateAggregate(createAggregateFromAggregateRow(newRow)); } - } + }); + } - for (UDFunction udf : created) - Schema.instance.addFunction(udf); - for (UDFunction udf : altered) - Schema.instance.updateFunction(udf); - for (UDFunction udf : dropped) - Schema.instance.dropFunction(udf); + public interface Differ + { + public void onDropped(UntypedResultSet.Row oldRow); + public void onAdded(UntypedResultSet.Row newRow); + public void onUpdated(UntypedResultSet.Row oldRow, UntypedResultSet.Row newRow); } - // see the comments for mergeKeyspaces() - private static void mergeAggregates(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after) + private static void diffSchema(Map<DecoratedKey, FilteredPartition> before, Map<DecoratedKey, FilteredPartition> after, Differ differ) { - List<UDAggregate> created = new ArrayList<>(); - List<UDAggregate> altered = new ArrayList<>(); - List<UDAggregate> dropped = new ArrayList<>(); + for (FilteredPartition newPartition : after.values()) + { + CFMetaData metadata = newPartition.metadata(); + DecoratedKey key = newPartition.partitionKey(); - MapDifference<DecoratedKey, ColumnFamily> diff = Maps.difference(before, after); + FilteredPartition oldPartition = before.remove(key); - // New keyspace with functions - for (Map.Entry<DecoratedKey, ColumnFamily> entry : diff.entriesOnlyOnRight().entrySet()) - if (entry.getValue().hasColumns()) - created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), entry.getValue())).values()); + if (oldPartition == null || oldPartition.isEmpty()) + { + // Means everything is to be added + for (Row row : newPartition) + differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, row)); + continue; + } - for (Map.Entry<DecoratedKey, MapDifference.ValueDifference<ColumnFamily>> entry : diff.entriesDiffering().entrySet()) - { - ColumnFamily pre = entry.getValue().leftValue(); - ColumnFamily post = entry.getValue().rightValue(); + Iterator<Row> oldIter = oldPartition.iterator(); + Iterator<Row> newIter = newPartition.iterator(); - if (pre.hasColumns() && post.hasColumns()) + Row oldRow = oldIter.hasNext() ? oldIter.next() : null; + Row newRow = newIter.hasNext() ? newIter.next() : null; + while (oldRow != null && newRow != null) { - MapDifference<ByteBuffer, UDAggregate> delta = - Maps.difference(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)), - createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post))); + int cmp = metadata.comparator.compare(oldRow.clustering(), newRow.clustering()); + if (cmp < 0) + { + differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow)); + oldRow = oldIter.hasNext() ? oldIter.next() : null; + } + else if (cmp > 0) + { - dropped.addAll(delta.entriesOnlyOnLeft().values()); - created.addAll(delta.entriesOnlyOnRight().values()); - Iterables.addAll(altered, Iterables.transform(delta.entriesDiffering().values(), new Function<MapDifference.ValueDifference<UDAggregate>, UDAggregate>() + differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); + newRow = newIter.hasNext() ? newIter.next() : null; + } + else { - public UDAggregate apply(MapDifference.ValueDifference<UDAggregate> pair) - { - return pair.rightValue(); - } - })); + if (!oldRow.equals(newRow)) + differ.onUpdated(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow), UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); + + oldRow = oldIter.hasNext() ? oldIter.next() : null; + newRow = newIter.hasNext() ? newIter.next() : null; + } } - else if (pre.hasColumns()) + + while (oldRow != null) { - dropped.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), pre)).values()); + differ.onDropped(UntypedResultSet.Row.fromInternalRow(metadata, key, oldRow)); + oldRow = oldIter.hasNext() ? oldIter.next() : null; } - else if (post.hasColumns()) + while (newRow != null) { - created.addAll(createAggregatesFromAggregatesPartition(new Row(entry.getKey(), post)).values()); + differ.onAdded(UntypedResultSet.Row.fromInternalRow(metadata, key, newRow)); + newRow = newIter.hasNext() ? newIter.next() : null; } } - for (UDAggregate udf : created) - Schema.instance.addAggregate(udf); - for (UDAggregate udf : altered) - Schema.instance.updateAggregate(udf); - for (UDAggregate udf : dropped) - Schema.instance.dropAggregate(udf); + // What remains is those keys that were only in before. + for (FilteredPartition partition : before.values()) + for (Row row : partition) + differ.onDropped(UntypedResultSet.Row.fromInternalRow(partition.metadata(), partition.partitionKey(), row)); } /* @@ -701,14 +653,15 @@ public class LegacySchemaTables private static Mutation makeCreateKeyspaceMutation(KSMetaData keyspace, long timestamp, boolean withTablesAndTypesAndFunctions) { - Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name)); - ColumnFamily cells = mutation.addOrGet(Keyspaces); - CFRowAdder adder = new CFRowAdder(cells, Keyspaces.comparator.builder().build(), timestamp); + // Note that because Keyspaces is a COMPACT TABLE, we're really only setting static columns internally and shouldn't set any clustering. + RowUpdateBuilder adder = new RowUpdateBuilder(Keyspaces, timestamp, keyspace.name); adder.add("durable_writes", keyspace.durableWrites); adder.add("strategy_class", keyspace.strategyClass.getName()); adder.add("strategy_options", json(keyspace.strategyOptions)); + Mutation mutation = adder.build(); + if (withTablesAndTypesAndFunctions) { for (UserType type : keyspace.userTypes.getAllTypes().values()) @@ -723,36 +676,39 @@ public class LegacySchemaTables public static Mutation makeDropKeyspaceMutation(KSMetaData keyspace, long timestamp) { - Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSKey(keyspace.name)); - for (String schemaTable : ALL) - mutation.delete(schemaTable, timestamp); - mutation.delete(SystemKeyspace.BUILT_INDEXES, timestamp); + int nowInSec = FBUtilities.nowInSeconds(); + Mutation mutation = new Mutation(SystemKeyspace.NAME, getSchemaKSDecoratedKey(keyspace.name)); + for (CFMetaData schemaTable : All) + mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec)); + mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltIndexes, mutation.key(), timestamp, nowInSec)); return mutation; } - private static KSMetaData createKeyspaceFromSchemaPartitions(Row serializedKeyspace, Row serializedTables, Row serializedTypes) + private static KSMetaData createKeyspaceFromSchemaPartitions(RowIterator serializedKeyspace, RowIterator serializedTables, RowIterator serializedTypes) { - Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables).values(); + Collection<CFMetaData> tables = createTablesFromTablesPartition(serializedTables); UTMetaData types = new UTMetaData(createTypesFromPartition(serializedTypes)); return createKeyspaceFromSchemaPartition(serializedKeyspace).cloneWith(tables, types); } public static KSMetaData createKeyspaceFromName(String keyspace) { - Row partition = readSchemaPartitionForKeyspace(KEYSPACES, keyspace); - - if (isEmptySchemaPartition(partition)) - throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES)); + return readSchemaPartitionForKeyspaceAndApply(KEYSPACES, keyspace, partition -> + { + if (partition.isEmpty()) + throw new RuntimeException(String.format("%s not found in the schema definitions keyspaceName (%s).", keyspace, KEYSPACES)); - return createKeyspaceFromSchemaPartition(partition); + return createKeyspaceFromSchemaPartition(partition); + }); } + /** * Deserialize only Keyspace attributes without nested tables or types * * @param partition Keyspace attributes in serialized form */ - private static KSMetaData createKeyspaceFromSchemaPartition(Row partition) + private static KSMetaData createKeyspaceFromSchemaPartition(RowIterator partition) { String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, KEYSPACES); UntypedResultSet.Row row = QueryProcessor.resultify(query, partition).one(); @@ -776,10 +732,8 @@ public class LegacySchemaTables private static void addTypeToSchemaMutation(UserType type, long timestamp, Mutation mutation) { - ColumnFamily cells = mutation.addOrGet(Usertypes); - - Composite prefix = Usertypes.comparator.make(type.name); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + RowUpdateBuilder adder = new RowUpdateBuilder(Usertypes, timestamp, mutation) + .clustering(type.name); adder.resetCollection("field_names"); adder.resetCollection("field_types"); @@ -789,23 +743,18 @@ public class LegacySchemaTables adder.addListEntry("field_names", type.fieldName(i)); adder.addListEntry("field_types", type.fieldType(i).toString()); } + + adder.build(); } public static Mutation dropTypeFromSchemaMutation(KSMetaData keyspace, UserType type, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); - - ColumnFamily cells = mutation.addOrGet(Usertypes); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = Usertypes.comparator.make(type.name); - cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - - return mutation; + return RowUpdateBuilder.deleteRow(Usertypes, timestamp, mutation, type.name); } - private static Map<ByteBuffer, UserType> createTypesFromPartition(Row partition) + private static Map<ByteBuffer, UserType> createTypesFromPartition(RowIterator partition) { String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, USERTYPES); Map<ByteBuffer, UserType> types = new HashMap<>(); @@ -851,12 +800,11 @@ public class LegacySchemaTables { // For property that can be null (and can be changed), we insert tombstones, to make sure // we don't keep a property the user has removed - ColumnFamily cells = mutation.addOrGet(Columnfamilies); - Composite prefix = Columnfamilies.comparator.make(table.cfName); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + RowUpdateBuilder adder = new RowUpdateBuilder(Columnfamilies, timestamp, mutation) + .clustering(table.cfName); adder.add("cf_id", table.cfId); - adder.add("type", table.cfType.toString()); + adder.add("type", table.isSuper() ? "Super" : "Standard"); if (table.isSuper()) { @@ -864,11 +812,11 @@ public class LegacySchemaTables // we won't know at deserialization if the subcomparator should be taken into account // TODO: we should implement an on-start migration if we want to get rid of that. adder.add("comparator", table.comparator.subtype(0).toString()); - adder.add("subcomparator", table.comparator.subtype(1).toString()); + adder.add("subcomparator", ((MapType)table.compactValueColumn().type).getKeysType().toString()); } else { - adder.add("comparator", table.comparator.toString()); + adder.add("comparator", LegacyLayout.makeLegacyComparator(table).toString()); } adder.add("bloom_filter_fp_chance", table.getBloomFilterFpChance()); @@ -878,7 +826,6 @@ public class LegacySchemaTables adder.add("compaction_strategy_options", json(table.compactionStrategyOptions)); adder.add("compression_parameters", json(table.compressionParameters.asThriftOptions())); adder.add("default_time_to_live", table.getDefaultTimeToLive()); - adder.add("default_validator", table.getDefaultValidator().toString()); adder.add("gc_grace_seconds", table.getGcGraceSeconds()); adder.add("key_validator", table.getKeyValidator().toString()); adder.add("local_read_repair_chance", table.getDcLocalReadRepairChance()); @@ -890,10 +837,18 @@ public class LegacySchemaTables adder.add("read_repair_chance", table.getReadRepairChance()); adder.add("speculative_retry", table.getSpeculativeRetry().toString()); - for (Map.Entry<ColumnIdentifier, Long> entry : table.getDroppedColumns().entrySet()) - adder.addMapEntry("dropped_columns", entry.getKey().toString(), entry.getValue()); + for (Map.Entry<ColumnIdentifier, CFMetaData.DroppedColumn> entry : table.getDroppedColumns().entrySet()) + { + String name = entry.getKey().toString(); + CFMetaData.DroppedColumn column = entry.getValue(); + adder.addMapEntry("dropped_columns", name, column.droppedTime); + if (column.type != null) + adder.addMapEntry("dropped_columns_types", name, column.type.toString()); + } + + adder.add("is_dense", table.isDense()); - adder.add("is_dense", table.getIsDense()); + adder.add("default_validator", table.makeLegacyDefaultValidator().toString()); if (withColumnsAndTriggers) { @@ -903,6 +858,8 @@ public class LegacySchemaTables for (TriggerDefinition trigger : table.getTriggers().values()) addTriggerToSchemaMutation(table, trigger, timestamp, mutation); } + + adder.build(); } public static Mutation makeUpdateTableMutation(KSMetaData keyspace, @@ -955,11 +912,7 @@ public class LegacySchemaTables // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); - ColumnFamily cells = mutation.addOrGet(Columnfamilies); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = Columnfamilies.comparator.make(table.cfName); - cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + RowUpdateBuilder.deleteRow(Columnfamilies, timestamp, mutation, table.cfName); for (ColumnDefinition column : table.allColumns()) dropColumnFromSchemaMutation(table, column, timestamp, mutation); @@ -968,21 +921,21 @@ public class LegacySchemaTables dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation); // TODO: get rid of in #6717 - ColumnFamily indexCells = mutation.addOrGet(SystemKeyspace.BuiltIndexes); for (String indexName : Keyspace.open(keyspace.name).getColumnFamilyStore(table.cfName).getBuiltIndexes()) - indexCells.addTombstone(indexCells.getComparator().makeCellName(indexName), ldt, timestamp); + RowUpdateBuilder.deleteRow(SystemKeyspace.BuiltIndexes, timestamp, mutation, indexName); return mutation; } public static CFMetaData createTableFromName(String keyspace, String table) { - Row partition = readSchemaPartitionForTable(COLUMNFAMILIES, keyspace, table); - - if (isEmptySchemaPartition(partition)) - throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table)); + return readSchemaPartitionForTableAndApply(COLUMNFAMILIES, keyspace, table, partition -> + { + if (partition.isEmpty()) + throw new RuntimeException(String.format("%s:%s not found in the schema definitions keyspace.", keyspace, table)); - return createTableFromTablePartition(partition); + return createTableFromTablePartition(partition); + }); } /** @@ -990,37 +943,34 @@ public class LegacySchemaTables * * @return map containing name of the table and its metadata for faster lookup */ - private static Map<String, CFMetaData> createTablesFromTablesPartition(Row partition) + private static Collection<CFMetaData> createTablesFromTablesPartition(RowIterator partition) { - if (partition.cf == null) - return Collections.emptyMap(); + if (partition.isEmpty()) + return Collections.emptyList(); String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); - Map<String, CFMetaData> tables = new HashMap<>(); + List<CFMetaData> tables = new ArrayList<>(); for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) - { - CFMetaData cfm = createTableFromTableRow(row); - tables.put(cfm.cfName, cfm); - } + tables.add(createTableFromTableRow(row)); return tables; } - public static CFMetaData createTableFromTablePartitionAndColumnsPartition(Row serializedTable, Row serializedColumns) + public static CFMetaData createTableFromTablePartitionAndColumnsPartition(RowIterator serializedTable, RowIterator serializedColumns) { String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); return createTableFromTableRowAndColumnsPartition(QueryProcessor.resultify(query, serializedTable).one(), serializedColumns); } - private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, Row serializedColumns) + private static CFMetaData createTableFromTableRowAndColumnsPartition(UntypedResultSet.Row tableRow, RowIterator serializedColumns) { String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNS); return createTableFromTableRowAndColumnRows(tableRow, QueryProcessor.resultify(query, serializedColumns)); } - private static CFMetaData createTableFromTablePartition(Row row) + private static CFMetaData createTableFromTablePartition(RowIterator partition) { String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, COLUMNFAMILIES); - return createTableFromTableRow(QueryProcessor.resultify(query, row).one()); + return createTableFromTableRow(QueryProcessor.resultify(query, partition).one()); } /** @@ -1033,12 +983,11 @@ public class LegacySchemaTables String ksName = result.getString("keyspace_name"); String cfName = result.getString("columnfamily_name"); - Row serializedColumns = readSchemaPartitionForTable(COLUMNS, ksName, cfName); - CFMetaData cfm = createTableFromTableRowAndColumnsPartition(result, serializedColumns); + CFMetaData cfm = readSchemaPartitionForTableAndApply(COLUMNS, ksName, cfName, partition -> createTableFromTableRowAndColumnsPartition(result, partition)); - Row serializedTriggers = readSchemaPartitionForTable(TRIGGERS, ksName, cfName); - for (TriggerDefinition trigger : createTriggersFromTriggersPartition(serializedTriggers)) - cfm.addTriggerDefinition(trigger); + readSchemaPartitionForTableAndApply(TRIGGERS, ksName, cfName, + partition -> { createTriggersFromTriggersPartition(partition).forEach(trigger -> cfm.addTriggerDefinition(trigger)); return null; } + ); return cfm; } @@ -1051,35 +1000,47 @@ public class LegacySchemaTables AbstractType<?> rawComparator = TypeParser.parse(result.getString("comparator")); AbstractType<?> subComparator = result.has("subcomparator") ? TypeParser.parse(result.getString("subcomparator")) : null; - ColumnFamilyType cfType = ColumnFamilyType.valueOf(result.getString("type")); - - AbstractType<?> fullRawComparator = CFMetaData.makeRawAbstractType(rawComparator, subComparator); - - List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions, - ksName, - cfName, - fullRawComparator, - cfType == ColumnFamilyType.Super); - boolean isDense = result.has("is_dense") - ? result.getBoolean("is_dense") - : CFMetaData.calculateIsDense(fullRawComparator, columnDefs); + boolean isSuper = result.getString("type").toLowerCase().equals("super"); + boolean isDense = result.getBoolean("is_dense"); + boolean isCompound = rawComparator instanceof CompositeType; - CellNameType comparator = CellNames.fromAbstractType(fullRawComparator, isDense); + // We don't really use the default validator but as we have it for backward compatibility, we use it to know if it's a counter table + AbstractType<?> defaultValidator = TypeParser.parse(result.getString("default_validator")); + boolean isCounter = defaultValidator instanceof CounterColumnType; // if we are upgrading, we use id generated from names initially UUID cfId = result.has("cf_id") ? result.getUUID("cf_id") : CFMetaData.generateLegacyCfId(ksName, cfName); - CFMetaData cfm = new CFMetaData(ksName, cfName, cfType, comparator, cfId); - cfm.isDense(isDense); + boolean isCQLTable = !isSuper && !isDense && isCompound; + boolean isStaticCompactTable = !isDense && !isCompound; + + // Internally, compact tables have a specific layout, see CompactTables. But when upgrading from + // previous versions, they may not have the expected schema, so detect if we need to upgrade and do + // it in createColumnsFromColumnRows. + // We can remove this once we don't support upgrade from versions < 3.0. + boolean needsUpgrade = isCQLTable ? false : checkNeedsUpgrade(serializedColumnDefinitions, isSuper, isStaticCompactTable); + + List<ColumnDefinition> columnDefs = createColumnsFromColumnRows(serializedColumnDefinitions, + ksName, + cfName, + rawComparator, + subComparator, + isSuper, + isCQLTable, + isStaticCompactTable, + needsUpgrade); + + if (needsUpgrade) + addDefinitionForUpgrade(columnDefs, ksName, cfName, isStaticCompactTable, isSuper, rawComparator, subComparator, defaultValidator); + + CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs); cfm.readRepairChance(result.getDouble("read_repair_chance")); cfm.dcLocalReadRepairChance(result.getDouble("local_read_repair_chance")); cfm.gcGraceSeconds(result.getInt("gc_grace_seconds")); - cfm.defaultValidator(TypeParser.parse(result.getString("default_validator"))); - cfm.keyValidator(TypeParser.parse(result.getString("key_validator"))); cfm.minCompactionThreshold(result.getInt("min_compaction_threshold")); cfm.maxCompactionThreshold(result.getInt("max_compaction_threshold")); if (result.has("comment")) @@ -1107,20 +1068,86 @@ public class LegacySchemaTables cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance()); if (result.has("dropped_columns")) - cfm.droppedColumns(convertDroppedColumns(result.getMap("dropped_columns", UTF8Type.instance, LongType.instance))); + { + Map<String, String> types = result.has("dropped_columns_types") + ? result.getMap("dropped_columns_types", UTF8Type.instance, UTF8Type.instance) + : Collections.<String, String>emptyMap(); + addDroppedColumns(cfm, result.getMap("dropped_columns", UTF8Type.instance, LongType.instance), types); + } + + return cfm; + } + + // Should only be called on compact tables + private static boolean checkNeedsUpgrade(UntypedResultSet defs, boolean isSuper, boolean isStaticCompactTable) + { + if (isSuper) + { + // Check if we've added the "supercolumn map" column yet or not + for (UntypedResultSet.Row row : defs) + { + if (row.getString("column_name").isEmpty()) + return false; + } + return true; + } + + // For static compact tables, we need to upgrade if the regular definitions haven't been converted to static yet, + // i.e. if we don't have a static definition yet. + if (isStaticCompactTable) + return !hasKind(defs, ColumnDefinition.Kind.STATIC); + + // For dense compact tables, we need to upgrade if we don't have a compact value definition + return !hasKind(defs, ColumnDefinition.Kind.REGULAR); + } + + private static void addDefinitionForUpgrade(List<ColumnDefinition> defs, + String ksName, + String cfName, + boolean isStaticCompactTable, + boolean isSuper, + AbstractType<?> rawComparator, + AbstractType<?> subComparator, + AbstractType<?> defaultValidator) + { + CompactTables.DefaultNames names = CompactTables.defaultNameGenerator(defs); - for (ColumnDefinition cd : columnDefs) - cfm.addOrReplaceColumnDefinition(cd); + if (isSuper) + { + defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true), null)); + } + else if (isStaticCompactTable) + { + defs.add(ColumnDefinition.clusteringKeyDef(ksName, cfName, names.defaultClusteringName(), rawComparator, null)); + defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator, null)); + } + else + { + // For dense compact tables, we get here if we don't have a compact value column, in which case we should add it + // (we use EmptyType to recognize that the compact value was not declared by the use (see CreateTableStatement too)) + defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance, null)); + } + } - return cfm.rebuild(); + private static boolean hasKind(UntypedResultSet defs, ColumnDefinition.Kind kind) + { + for (UntypedResultSet.Row row : defs) + { + if (deserializeKind(row.getString("type")) == kind) + return true; + } + return false; } - private static Map<ColumnIdentifier, Long> convertDroppedColumns(Map<String, Long> raw) + private static void addDroppedColumns(CFMetaData cfm, Map<String, Long> droppedTimes, Map<String, String> types) { - Map<ColumnIdentifier, Long> converted = Maps.newHashMap(); - for (Map.Entry<String, Long> entry : raw.entrySet()) - converted.put(new ColumnIdentifier(entry.getKey(), true), entry.getValue()); - return converted; + for (Map.Entry<String, Long> entry : droppedTimes.entrySet()) + { + String name = entry.getKey(); + long time = entry.getValue(); + AbstractType<?> type = types.containsKey(name) ? TypeParser.parse(types.get(name)) : null; + cfm.getDroppedColumns().put(ColumnIdentifier.getInterned(name, true), new CFMetaData.DroppedColumn(type, time)); + } } /* @@ -1129,50 +1156,59 @@ public class LegacySchemaTables private static void addColumnToSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) { - ColumnFamily cells = mutation.addOrGet(Columns); - Composite prefix = Columns.comparator.make(table.cfName, column.name.toString()); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + RowUpdateBuilder adder = new RowUpdateBuilder(Columns, timestamp, mutation) + .clustering(table.cfName, column.name.toString()); adder.add("validator", column.type.toString()); - adder.add("type", serializeKind(column.kind)); + adder.add("type", serializeKind(column.kind, table.isDense())); adder.add("component_index", column.isOnAllComponents() ? null : column.position()); adder.add("index_name", column.getIndexName()); adder.add("index_type", column.getIndexType() == null ? null : column.getIndexType().toString()); adder.add("index_options", json(column.getIndexOptions())); + + adder.build(); } - private static String serializeKind(ColumnDefinition.Kind kind) + private static String serializeKind(ColumnDefinition.Kind kind, boolean isDense) { - // For backward compatibility we need to special case CLUSTERING_COLUMN - return kind == ColumnDefinition.Kind.CLUSTERING_COLUMN ? "clustering_key" : kind.toString().toLowerCase(); + // For backward compatibility, we special case CLUSTERING_COLUMN and the case where the table is dense. + if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN) + return "clustering_key"; + + if (kind == ColumnDefinition.Kind.REGULAR && isDense) + return "compact_value"; + + return kind.toString().toLowerCase(); } - private static ColumnDefinition.Kind deserializeKind(String kind) + public static ColumnDefinition.Kind deserializeKind(String kind) { if (kind.equalsIgnoreCase("clustering_key")) return ColumnDefinition.Kind.CLUSTERING_COLUMN; + if (kind.equalsIgnoreCase("compact_value")) + return ColumnDefinition.Kind.REGULAR; return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase()); } private static void dropColumnFromSchemaMutation(CFMetaData table, ColumnDefinition column, long timestamp, Mutation mutation) { - ColumnFamily cells = mutation.addOrGet(Columns); - int ldt = (int) (System.currentTimeMillis() / 1000); - // Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference). - Composite prefix = Columns.comparator.make(table.cfName, column.name.toString()); - cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + RowUpdateBuilder.deleteRow(Columns, timestamp, mutation, table.cfName, column.name.toString()); } private static List<ColumnDefinition> createColumnsFromColumnRows(UntypedResultSet rows, String keyspace, String table, AbstractType<?> rawComparator, - boolean isSuper) + AbstractType<?> rawSubComparator, + boolean isSuper, + boolean isCQLTable, + boolean isStaticCompactTable, + boolean needsUpgrade) { List<ColumnDefinition> columns = new ArrayList<>(); for (UntypedResultSet.Row row : rows) - columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, isSuper)); + columns.add(createColumnFromColumnRow(row, keyspace, table, rawComparator, rawSubComparator, isSuper, isCQLTable, isStaticCompactTable, needsUpgrade)); return columns; } @@ -1180,22 +1216,26 @@ public class LegacySchemaTables String keyspace, String table, AbstractType<?> rawComparator, - boolean isSuper) + AbstractType<?> rawSubComparator, + boolean isSuper, + boolean isCQLTable, + boolean isStaticCompactTable, + boolean needsUpgrade) { ColumnDefinition.Kind kind = deserializeKind(row.getString("type")); + if (needsUpgrade && isStaticCompactTable && kind == ColumnDefinition.Kind.REGULAR) + kind = ColumnDefinition.Kind.STATIC; Integer componentIndex = null; if (row.has("component_index")) componentIndex = row.getInt("component_index"); - else if (kind == ColumnDefinition.Kind.CLUSTERING_COLUMN && isSuper) - componentIndex = 1; // A ColumnDefinition for super columns applies to the column component // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we // we need to use the comparator fromString method - AbstractType<?> comparator = kind == ColumnDefinition.Kind.REGULAR - ? getComponentComparator(rawComparator, componentIndex) - : UTF8Type.instance; - ColumnIdentifier name = new ColumnIdentifier(comparator.fromString(row.getString("column_name")), comparator); + AbstractType<?> comparator = isCQLTable + ? UTF8Type.instance + : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator); + ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator); AbstractType<?> validator = parseType(row.getString("validator")); @@ -1214,32 +1254,21 @@ public class LegacySchemaTables return new ColumnDefinition(keyspace, table, name, validator, indexType, indexOptions, indexName, componentIndex, kind); } - private static AbstractType<?> getComponentComparator(AbstractType<?> rawComparator, Integer componentIndex) - { - return (componentIndex == null || (componentIndex == 0 && !(rawComparator instanceof CompositeType))) - ? rawComparator - : ((CompositeType)rawComparator).types.get(componentIndex); - } - /* * Trigger metadata serialization/deserialization. */ private static void addTriggerToSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation) { - ColumnFamily cells = mutation.addOrGet(Triggers); - Composite prefix = Triggers.comparator.make(table.cfName, trigger.name); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); - adder.addMapEntry("trigger_options", "class", trigger.classOption); + new RowUpdateBuilder(Triggers, timestamp, mutation) + .clustering(table.cfName, trigger.name) + .addMapEntry("trigger_options", "class", trigger.classOption) + .build(); } private static void dropTriggerFromSchemaMutation(CFMetaData table, TriggerDefinition trigger, long timestamp, Mutation mutation) { - ColumnFamily cells = mutation.addOrGet(Triggers); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = Triggers.comparator.make(table.cfName, trigger.name); - cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); + RowUpdateBuilder.deleteRow(Triggers, timestamp, mutation, table.cfName, trigger.name); } /** @@ -1248,7 +1277,7 @@ public class LegacySchemaTables * @param partition storage-level partition containing the trigger definitions * @return the list of processed TriggerDefinitions */ - private static List<TriggerDefinition> createTriggersFromTriggersPartition(Row partition) + private static List<TriggerDefinition> createTriggersFromTriggersPartition(RowIterator partition) { List<TriggerDefinition> triggers = new ArrayList<>(); String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, TRIGGERS); @@ -1275,48 +1304,37 @@ public class LegacySchemaTables private static void addFunctionToSchemaMutation(UDFunction function, long timestamp, Mutation mutation) { - ColumnFamily cells = mutation.addOrGet(Functions); - Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function)); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + RowUpdateBuilder adder = new RowUpdateBuilder(Functions, timestamp, mutation) + .clustering(function.name().name, functionSignatureWithTypes(function)); + + adder.add("body", function.body()); + adder.add("language", function.language()); + adder.add("return_type", function.returnType().toString()); + adder.add("called_on_null_input", function.isCalledOnNullInput()); adder.resetCollection("argument_names"); adder.resetCollection("argument_types"); - for (int i = 0; i < function.argNames().size(); i++) { adder.addListEntry("argument_names", function.argNames().get(i).bytes); adder.addListEntry("argument_types", function.argTypes().get(i).toString()); } - - adder.add("body", function.body()); - adder.add("language", function.language()); - adder.add("return_type", function.returnType().toString()); - adder.add("called_on_null_input", function.isCalledOnNullInput()); + adder.build(); } public static Mutation makeDropFunctionMutation(KSMetaData keyspace, UDFunction function, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); - - ColumnFamily cells = mutation.addOrGet(Functions); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = Functions.comparator.make(function.name().name, functionSignatureWithTypes(function)); - cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - - return mutation; + return RowUpdateBuilder.deleteRow(Functions, timestamp, mutation, function.name().name, functionSignatureWithTypes(function)); } - private static Map<ByteBuffer, UDFunction> createFunctionsFromFunctionsPartition(Row partition) + private static Collection<UDFunction> createFunctionsFromFunctionsPartition(RowIterator partition) { - Map<ByteBuffer, UDFunction> functions = new HashMap<>(); + List<UDFunction> functions = new ArrayList<>(); String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, FUNCTIONS); for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) - { - UDFunction function = createFunctionFromFunctionRow(row); - functions.put(functionSignatureWithNameAndTypes(function), function); - } + functions.add(createFunctionFromFunctionRow(row)); return functions; } @@ -1387,9 +1405,8 @@ public class LegacySchemaTables private static void addAggregateToSchemaMutation(UDAggregate aggregate, long timestamp, Mutation mutation) { - ColumnFamily cells = mutation.addOrGet(Aggregates); - Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate)); - CFRowAdder adder = new CFRowAdder(cells, prefix, timestamp); + RowUpdateBuilder adder = new RowUpdateBuilder(Aggregates, timestamp, mutation) + .clustering(aggregate.name().name, functionSignatureWithTypes(aggregate)); adder.resetCollection("argument_types"); adder.add("return_type", aggregate.returnType().toString()); @@ -1403,17 +1420,16 @@ public class LegacySchemaTables for (AbstractType<?> argType : aggregate.argTypes()) adder.addListEntry("argument_types", argType.toString()); + + adder.build(); } - private static Map<ByteBuffer, UDAggregate> createAggregatesFromAggregatesPartition(Row partition) + private static Collection<UDAggregate> createAggregatesFromAggregatesPartition(RowIterator partition) { - Map<ByteBuffer, UDAggregate> aggregates = new HashMap<>(); + List<UDAggregate> aggregates = new ArrayList<>(); String query = String.format("SELECT * FROM %s.%s", SystemKeyspace.NAME, AGGREGATES); for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) - { - UDAggregate aggregate = createAggregateFromAggregateRow(row); - aggregates.put(functionSignatureWithNameAndTypes(aggregate), aggregate); - } + aggregates.add(createAggregateFromAggregateRow(row)); return aggregates; } @@ -1475,14 +1491,7 @@ public class LegacySchemaTables { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). Mutation mutation = makeCreateKeyspaceMutation(keyspace, timestamp, false); - - ColumnFamily cells = mutation.addOrGet(Aggregates); - int ldt = (int) (System.currentTimeMillis() / 1000); - - Composite prefix = Aggregates.comparator.make(aggregate.name().name, functionSignatureWithTypes(aggregate)); - cells.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt)); - - return mutation; + return RowUpdateBuilder.deleteRow(Aggregates, timestamp, mutation, aggregate.name().name, functionSignatureWithTypes(aggregate)); } private static AbstractType<?> parseType(String str) @@ -1515,5 +1524,4 @@ public class LegacySchemaTables strList.add(argType.asCQL3Type().toString()); return list.decompose(strList); } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/ListSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/ListSerializer.java b/src/java/org/apache/cassandra/serializers/ListSerializer.java index aeee2b9..b1c5508 100644 --- a/src/java/org/apache/cassandra/serializers/ListSerializer.java +++ b/src/java/org/apache/cassandra/serializers/ListSerializer.java @@ -143,14 +143,16 @@ public class ListSerializer<T> extends CollectionSerializer<List<T>> { StringBuilder sb = new StringBuilder(); boolean isFirst = true; + sb.append('['); for (T element : value) { if (isFirst) isFirst = false; else - sb.append("; "); + sb.append(", "); sb.append(elements.toString(element)); } + sb.append(']'); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/MapSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/MapSerializer.java b/src/java/org/apache/cassandra/serializers/MapSerializer.java index 8350f66..7d81598 100644 --- a/src/java/org/apache/cassandra/serializers/MapSerializer.java +++ b/src/java/org/apache/cassandra/serializers/MapSerializer.java @@ -84,7 +84,7 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>> } catch (BufferUnderflowException e) { - throw new MarshalException("Not enough bytes to read a set"); + throw new MarshalException("Not enough bytes to read a map"); } } @@ -150,19 +150,19 @@ public class MapSerializer<K, V> extends CollectionSerializer<Map<K, V>> public String toString(Map<K, V> value) { StringBuilder sb = new StringBuilder(); + sb.append('{'); boolean isFirst = true; for (Map.Entry<K, V> element : value.entrySet()) { if (isFirst) isFirst = false; else - sb.append("; "); - sb.append('('); + sb.append(", "); sb.append(keys.toString(element.getKey())); - sb.append(", "); + sb.append(": "); sb.append(values.toString(element.getValue())); - sb.append(')'); } + sb.append('}'); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/serializers/SetSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/SetSerializer.java b/src/java/org/apache/cassandra/serializers/SetSerializer.java index 21f5075..7108630 100644 --- a/src/java/org/apache/cassandra/serializers/SetSerializer.java +++ b/src/java/org/apache/cassandra/serializers/SetSerializer.java @@ -94,13 +94,14 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>> } catch (BufferUnderflowException e) { - throw new MarshalException("Not enough bytes to read a list"); + throw new MarshalException("Not enough bytes to read a set"); } } public String toString(Set<T> value) { StringBuilder sb = new StringBuilder(); + sb.append('{'); boolean isFirst = true; for (T element : value) { @@ -110,10 +111,11 @@ public class SetSerializer<T> extends CollectionSerializer<Set<T>> } else { - sb.append("; "); + sb.append(", "); } sb.append(elements.toString(element)); } + sb.append('}'); return sb.toString(); }
