Merge branch 'cassandra-2.2' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/418c7936
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/418c7936
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/418c7936

Branch: refs/heads/trunk
Commit: 418c7936fb61ca3e385326bddb55ee4a81e97d53
Parents: c734cb8 fc202a7
Author: Robert Stupp <[email protected]>
Authored: Sun Jul 12 10:54:36 2015 +0200
Committer: Robert Stupp <[email protected]>
Committed: Sun Jul 12 10:54:36 2015 +0200

----------------------------------------------------------------------
 doc/cql3/CQL.textile                            |  4 +-
 .../statements/CreateAggregateStatement.java    | 13 ++----
 .../cassandra/schema/LegacySchemaMigrator.java  | 21 +--------
 .../apache/cassandra/schema/SchemaKeyspace.java | 16 ++-----
 .../validation/operations/AggregationTest.java  | 49 --------------------
 .../schema/LegacySchemaMigratorTest.java        |  4 +-
 6 files changed, 13 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/418c7936/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index 16d9fc5,1d73e3f..0bb13e5
--- 
a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@@ -88,9 -88,9 +88,9 @@@ public final class CreateAggregateState
          AbstractType<?> stateType = prepareType("state type", stateTypeRaw);
  
          List<AbstractType<?>> stateArgs = stateArguments(stateType, argTypes);
-         stateFunc = validateFunctionKeyspace(stateFunc, stateArgs);
+         stateFunc = validateFunctionKeyspace(stateFunc);
  
 -        Function f = Functions.find(stateFunc, stateArgs);
 +        Function f = Schema.instance.findFunction(stateFunc, 
stateArgs).orElse(null);
          if (!(f instanceof ScalarFunction))
              throw new InvalidRequestException("State function " + 
stateFuncSig(stateFunc, stateTypeRaw, argRawTypes) + " does not exist or is not 
a scalar function");
          stateFunction = (ScalarFunction)f;
@@@ -102,8 -102,8 +102,8 @@@
          if (finalFunc != null)
          {
              List<AbstractType<?>> finalArgs = 
Collections.<AbstractType<?>>singletonList(stateType);
-             finalFunc = validateFunctionKeyspace(finalFunc, finalArgs);
+             finalFunc = validateFunctionKeyspace(finalFunc);
 -            f = Functions.find(finalFunc, finalArgs);
 +            f = Schema.instance.findFunction(finalFunc, 
finalArgs).orElse(null);
              if (!(f instanceof ScalarFunction))
                  throw new InvalidRequestException("Final function " + 
finalFunc + '(' + stateTypeRaw + ") does not exist or is not a scalar 
function");
              finalFunction = (ScalarFunction) f;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/418c7936/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 996b5ff,0000000..dc9e168
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@@ -1,806 -1,0 +1,789 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.HashMultimap;
 +import com.google.common.collect.ImmutableList;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.functions.FunctionName;
 +import org.apache.cassandra.cql3.functions.UDAggregate;
 +import org.apache.cassandra.cql3.functions.UDFunction;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.io.compress.CompressionParameters;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +
 +import static java.lang.String.format;
 +import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 +import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
 +
 +/**
 + * This majestic class performs migration from legacy (pre-3.0) 
system.schema_* schema tables to the new and glorious
 + * system_schema keyspace.
 + *
 + * The goal is to not lose any information in the migration - including the 
timestamps.
 + */
 +@SuppressWarnings("deprecation")
 +public final class LegacySchemaMigrator
 +{
 +    private LegacySchemaMigrator()
 +    {
 +    }
 +
 +    private static final Logger logger = 
LoggerFactory.getLogger(LegacySchemaMigrator.class);
 +
 +    static final List<CFMetaData> LegacySchemaTables =
 +        ImmutableList.of(SystemKeyspace.LegacyKeyspaces,
 +                         SystemKeyspace.LegacyColumnfamilies,
 +                         SystemKeyspace.LegacyColumns,
 +                         SystemKeyspace.LegacyTriggers,
 +                         SystemKeyspace.LegacyUsertypes,
 +                         SystemKeyspace.LegacyFunctions,
 +                         SystemKeyspace.LegacyAggregates);
 +
 +    public static void migrate()
 +    {
 +        // read metadata from the legacy schema tables
 +        Collection<Keyspace> keyspaces = readSchema();
 +
 +        // if already upgraded, or starting a new 3.0 node, abort early
 +        if (keyspaces.isEmpty())
 +        {
 +            unloadLegacySchemaTables();
 +            return;
 +        }
 +
 +        // write metadata to the new schema tables
 +        logger.info("Moving {} keyspaces from legacy schema tables to the new 
schema keyspace ({})",
 +                    keyspaces.size(),
 +                    SchemaKeyspace.NAME);
 +        
keyspaces.forEach(LegacySchemaMigrator::storeKeyspaceInNewSchemaTables);
 +
 +        // flush the new tables before truncating the old ones
 +        SchemaKeyspace.flush();
 +
 +        // truncate the original tables (will be snapshotted now, and will 
have been snapshotted by pre-flight checks)
 +        logger.info("Truncating legacy schema tables");
 +        truncateLegacySchemaTables();
 +
 +        // remove legacy schema tables from Schema, so that their presence 
doesn't give the users any wrong ideas
 +        unloadLegacySchemaTables();
 +
 +        logger.info("Completed migration of legacy schema tables");
 +    }
 +
 +    static void unloadLegacySchemaTables()
 +    {
 +        KeyspaceMetadata systemKeyspace = 
Schema.instance.getKSMetaData(SystemKeyspace.NAME);
 +
 +        Tables systemTables = systemKeyspace.tables;
 +        for (CFMetaData table : LegacySchemaTables)
 +            systemTables = systemTables.without(table.cfName);
 +
 +        LegacySchemaTables.forEach(Schema.instance::unload);
 +
 +        
Schema.instance.setKeyspaceMetadata(systemKeyspace.withSwapped(systemTables));
 +    }
 +
 +    private static void truncateLegacySchemaTables()
 +    {
 +        LegacySchemaTables.forEach(table -> 
Schema.instance.getColumnFamilyStoreInstance(table.cfId).truncateBlocking());
 +    }
 +
 +    private static void storeKeyspaceInNewSchemaTables(Keyspace keyspace)
 +    {
 +        Mutation mutation = 
SchemaKeyspace.makeCreateKeyspaceMutation(keyspace.name, keyspace.params, 
keyspace.timestamp);
 +
 +        for (Table table : keyspace.tables)
 +            SchemaKeyspace.addTableToSchemaMutation(table.metadata, 
table.timestamp, true, mutation);
 +
 +        for (Type type : keyspace.types)
 +            SchemaKeyspace.addTypeToSchemaMutation(type.metadata, 
type.timestamp, mutation);
 +
 +        for (Function function : keyspace.functions)
 +            SchemaKeyspace.addFunctionToSchemaMutation(function.metadata, 
function.timestamp, mutation);
 +
 +        for (Aggregate aggregate : keyspace.aggregates)
 +            SchemaKeyspace.addAggregateToSchemaMutation(aggregate.metadata, 
aggregate.timestamp, mutation);
 +
 +        mutation.apply();
 +    }
 +
 +    /*
 +     * Read all keyspaces metadata (including nested tables, types, and 
functions), with their modification timestamps
 +     */
 +    private static Collection<Keyspace> readSchema()
 +    {
 +        String query = format("SELECT keyspace_name FROM %s.%s", 
SystemKeyspace.NAME, SystemKeyspace.LEGACY_KEYSPACES);
 +        Collection<String> keyspaceNames = new ArrayList<>();
 +        query(query).forEach(row -> 
keyspaceNames.add(row.getString("keyspace_name")));
 +        keyspaceNames.removeAll(Schema.SYSTEM_KEYSPACE_NAMES);
 +
 +        Collection<Keyspace> keyspaces = new ArrayList<>();
 +        keyspaceNames.forEach(name -> keyspaces.add(readKeyspace(name)));
 +        return keyspaces;
 +    }
 +
 +    private static Keyspace readKeyspace(String keyspaceName)
 +    {
 +        long timestamp = readKeyspaceTimestamp(keyspaceName);
 +        KeyspaceParams params = readKeyspaceParams(keyspaceName);
 +
 +        Collection<Table> tables = readTables(keyspaceName);
 +        Collection<Type> types = readTypes(keyspaceName);
 +        Collection<Function> functions = readFunctions(keyspaceName);
 +        Collection<Aggregate> aggregates = readAggregates(keyspaceName);
 +
 +        return new Keyspace(timestamp, keyspaceName, params, tables, types, 
functions, aggregates);
 +    }
 +
 +    /*
 +     * Reading keyspace params
 +     */
 +
 +    private static long readKeyspaceTimestamp(String keyspaceName)
 +    {
 +        String query = format("SELECT writeTime(durable_writes) AS timestamp 
FROM %s.%s WHERE keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_KEYSPACES);
 +        return query(query, keyspaceName).one().getLong("timestamp");
 +    }
 +
 +    private static KeyspaceParams readKeyspaceParams(String keyspaceName)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_KEYSPACES);
 +        UntypedResultSet.Row row = query(query, keyspaceName).one();
 +
 +        boolean durableWrites = row.getBoolean("durable_writes");
 +
 +        Map<String, String> replication = new HashMap<>();
 +        replication.putAll(fromJsonMap(row.getString("strategy_options")));
 +        replication.put(KeyspaceParams.Replication.CLASS, 
row.getString("strategy_class"));
 +
 +        return KeyspaceParams.create(durableWrites, replication);
 +    }
 +
 +    /*
 +     * Reading tables
 +     */
 +
 +    private static Collection<Table> readTables(String keyspaceName)
 +    {
 +        String query = format("SELECT columnfamily_name FROM %s.%s WHERE 
keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_COLUMNFAMILIES);
 +        Collection<String> tableNames = new ArrayList<>();
 +        query(query, keyspaceName).forEach(row -> 
tableNames.add(row.getString("columnfamily_name")));
 +
 +        Collection<Table> tables = new ArrayList<>();
 +        tableNames.forEach(name -> tables.add(readTable(keyspaceName, name)));
 +        return tables;
 +    }
 +
 +    private static Table readTable(String keyspaceName, String tableName)
 +    {
 +        long timestamp = readTableTimestamp(keyspaceName, tableName);
 +        CFMetaData metadata = readTableMetadata(keyspaceName, tableName);
 +        return new Table(timestamp, metadata);
 +    }
 +
 +    private static long readTableTimestamp(String keyspaceName, String 
tableName)
 +    {
 +        String query = format("SELECT writeTime(type) AS timestamp FROM %s.%s 
WHERE keyspace_name = ? AND columnfamily_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_COLUMNFAMILIES);
 +        return query(query, keyspaceName, 
tableName).one().getLong("timestamp");
 +    }
 +
 +    private static CFMetaData readTableMetadata(String keyspaceName, String 
tableName)
 +    {
 +        String tableQuery = format("SELECT * FROM %s.%s WHERE keyspace_name = 
? AND columnfamily_name = ?",
 +                                   SystemKeyspace.NAME,
 +                                   SystemKeyspace.LEGACY_COLUMNFAMILIES);
 +        UntypedResultSet.Row tableRow = query(tableQuery, keyspaceName, 
tableName).one();
 +
 +        String columnsQuery = format("SELECT * FROM %s.%s WHERE keyspace_name 
= ? AND columnfamily_name = ?",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.LEGACY_COLUMNS);
 +        UntypedResultSet columnRows = query(columnsQuery, keyspaceName, 
tableName);
 +
 +        String triggersQuery = format("SELECT * FROM %s.%s WHERE 
keyspace_name = ? AND columnfamily_name = ?",
 +                                      SystemKeyspace.NAME,
 +                                      SystemKeyspace.LEGACY_TRIGGERS);
 +        UntypedResultSet triggerRows = query(triggersQuery, keyspaceName, 
tableName);
 +
 +        return decodeTableMetadata(tableRow, columnRows, triggerRows);
 +    }
 +
 +    private static CFMetaData decodeTableMetadata(UntypedResultSet.Row 
tableRow,
 +                                                  UntypedResultSet columnRows,
 +                                                  UntypedResultSet 
triggerRows)
 +    {
 +        String ksName = tableRow.getString("keyspace_name");
 +        String cfName = tableRow.getString("columnfamily_name");
 +
 +        AbstractType<?> rawComparator = 
TypeParser.parse(tableRow.getString("comparator"));
 +        AbstractType<?> subComparator = tableRow.has("subcomparator") ? 
TypeParser.parse(tableRow.getString("subcomparator")) : null;
 +
 +        boolean isSuper = 
"super".equals(tableRow.getString("type").toLowerCase());
 +        boolean isDense = tableRow.getBoolean("is_dense");
 +        boolean isCompound = rawComparator instanceof CompositeType;
 +
 +        // We don't really use the default validator but as we have it for 
backward compatibility, we use it to know if it's a counter table
 +        AbstractType<?> defaultValidator = 
TypeParser.parse(tableRow.getString("default_validator"));
 +        boolean isCounter = defaultValidator instanceof CounterColumnType;
 +
 +        /*
 +         * With CASSANDRA-5202 we stopped inferring the cf id from the 
combination of keyspace/table names,
 +         * and started storing the generated uuids in 
system.schema_columnfamilies.
 +         *
 +         * In 3.0 we SHOULD NOT see tables like that (2.0-created, 
non-upgraded).
 +         * But in the off-chance that we do, we generate the deterministic 
uuid here.
 +         */
 +        UUID cfId = tableRow.has("cf_id")
 +                  ? tableRow.getUUID("cf_id")
 +                  : CFMetaData.generateLegacyCfId(ksName, cfName);
 +
 +        boolean isCQLTable = !isSuper && !isDense && isCompound;
 +        boolean isStaticCompactTable = !isDense && !isCompound;
 +
 +        // Internally, compact tables have a specific layout, see 
CompactTables. But when upgrading from
 +        // previous versions, they may not have the expected schema, so 
detect if we need to upgrade and do
 +        // it in createColumnsFromColumnRows.
 +        // We can remove this once we don't support upgrade from versions < 
3.0.
 +        boolean needsUpgrade = !isCQLTable && checkNeedsUpgrade(columnRows, 
isSuper, isStaticCompactTable);
 +
 +        List<ColumnDefinition> columnDefs = 
createColumnsFromColumnRows(columnRows,
 +                                                                        
ksName,
 +                                                                        
cfName,
 +                                                                        
rawComparator,
 +                                                                        
subComparator,
 +                                                                        
isSuper,
 +                                                                        
isCQLTable,
 +                                                                        
isStaticCompactTable,
 +                                                                        
needsUpgrade);
 +
 +        if (needsUpgrade)
 +            addDefinitionForUpgrade(columnDefs, ksName, cfName, 
isStaticCompactTable, isSuper, rawComparator, subComparator, defaultValidator);
 +
 +        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, 
isCompound, isSuper, isCounter, columnDefs);
 +
 +        cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
 +        
cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));
 +        cfm.gcGraceSeconds(tableRow.getInt("gc_grace_seconds"));
 +        
cfm.minCompactionThreshold(tableRow.getInt("min_compaction_threshold"));
 +        
cfm.maxCompactionThreshold(tableRow.getInt("max_compaction_threshold"));
 +        if (tableRow.has("comment"))
 +            cfm.comment(tableRow.getString("comment"));
 +        if (tableRow.has("memtable_flush_period_in_ms"))
 +            
cfm.memtableFlushPeriod(tableRow.getInt("memtable_flush_period_in_ms"));
 +        cfm.caching(CachingOptions.fromString(tableRow.getString("caching")));
 +        if (tableRow.has("default_time_to_live"))
 +            cfm.defaultTimeToLive(tableRow.getInt("default_time_to_live"));
 +        if (tableRow.has("speculative_retry"))
 +            
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(tableRow.getString("speculative_retry")));
 +        
cfm.compactionStrategyClass(CFMetaData.createCompactionStrategy(tableRow.getString("compaction_strategy_class")));
 +        
cfm.compressionParameters(CompressionParameters.create(fromJsonMap(tableRow.getString("compression_parameters"))));
 +        
cfm.compactionStrategyOptions(fromJsonMap(tableRow.getString("compaction_strategy_options")));
 +
 +        if (tableRow.has("min_index_interval"))
 +            cfm.minIndexInterval(tableRow.getInt("min_index_interval"));
 +
 +        if (tableRow.has("max_index_interval"))
 +            cfm.maxIndexInterval(tableRow.getInt("max_index_interval"));
 +
 +        if (tableRow.has("bloom_filter_fp_chance"))
 +            
cfm.bloomFilterFpChance(tableRow.getDouble("bloom_filter_fp_chance"));
 +        else
 +            cfm.bloomFilterFpChance(cfm.getBloomFilterFpChance());
 +
 +        if (tableRow.has("dropped_columns"))
 +            addDroppedColumns(cfm, tableRow.getMap("dropped_columns", 
UTF8Type.instance, LongType.instance), Collections.emptyMap());
 +
 +        cfm.triggers(createTriggersFromTriggerRows(triggerRows));
 +
 +        return cfm;
 +    }
 +
 +    // Should only be called on compact tables
 +    private static boolean checkNeedsUpgrade(UntypedResultSet defs, boolean 
isSuper, boolean isStaticCompactTable)
 +    {
 +        if (isSuper)
 +        {
 +            // Check if we've added the "supercolumn map" column yet or not
 +            for (UntypedResultSet.Row row : defs)
 +                if (row.getString("column_name").isEmpty())
 +                    return false;
 +            return true;
 +        }
 +
 +        // For static compact tables, we need to upgrade if the regular 
definitions haven't been converted to static yet,
 +        // i.e. if we don't have a static definition yet.
 +        if (isStaticCompactTable)
 +            return !hasKind(defs, ColumnDefinition.Kind.STATIC);
 +
 +        // For dense compact tables, we need to upgrade if we don't have a 
compact value definition
 +        return !hasKind(defs, ColumnDefinition.Kind.REGULAR);
 +    }
 +
 +    private static void addDefinitionForUpgrade(List<ColumnDefinition> defs,
 +                                                String ksName,
 +                                                String cfName,
 +                                                boolean isStaticCompactTable,
 +                                                boolean isSuper,
 +                                                AbstractType<?> rawComparator,
 +                                                AbstractType<?> subComparator,
 +                                                AbstractType<?> 
defaultValidator)
 +    {
 +        CompactTables.DefaultNames names = 
CompactTables.defaultNameGenerator(defs);
 +
 +        if (isSuper)
 +        {
 +            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, 
defaultValidator, true), null));
 +        }
 +        else if (isStaticCompactTable)
 +        {
 +            defs.add(ColumnDefinition.clusteringKeyDef(ksName, cfName, 
names.defaultClusteringName(), rawComparator, null));
 +            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
names.defaultCompactValueName(), defaultValidator, null));
 +        }
 +        else
 +        {
 +            // For dense compact tables, we get here if we don't have a 
compact value column, in which case we should add it
 +            // (we use EmptyType to recognize that the compact value was not 
declared by the use (see CreateTableStatement too))
 +            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
names.defaultCompactValueName(), EmptyType.instance, null));
 +        }
 +    }
 +
 +    private static boolean hasKind(UntypedResultSet defs, 
ColumnDefinition.Kind kind)
 +    {
 +        for (UntypedResultSet.Row row : defs)
 +        {
 +            if (deserializeKind(row.getString("type")) == kind)
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    private static void addDroppedColumns(CFMetaData cfm, Map<String, Long> 
droppedTimes, Map<String, String> types)
 +    {
 +        for (Map.Entry<String, Long> entry : droppedTimes.entrySet())
 +        {
 +            String name = entry.getKey();
 +            long time = entry.getValue();
 +            AbstractType<?> type = types.containsKey(name) ? 
TypeParser.parse(types.get(name)) : null;
 +            cfm.getDroppedColumns().put(ColumnIdentifier.getInterned(name, 
true), new CFMetaData.DroppedColumn(type, time));
 +        }
 +    }
 +
 +    private static List<ColumnDefinition> 
createColumnsFromColumnRows(UntypedResultSet rows,
 +                                                                      String 
keyspace,
 +                                                                      String 
table,
 +                                                                      
AbstractType<?> rawComparator,
 +                                                                      
AbstractType<?> rawSubComparator,
 +                                                                      boolean 
isSuper,
 +                                                                      boolean 
isCQLTable,
 +                                                                      boolean 
isStaticCompactTable,
 +                                                                      boolean 
needsUpgrade)
 +    {
 +        List<ColumnDefinition> columns = new ArrayList<>();
 +        for (UntypedResultSet.Row row : rows)
 +            columns.add(createColumnFromColumnRow(row, keyspace, table, 
rawComparator, rawSubComparator, isSuper, isCQLTable, isStaticCompactTable, 
needsUpgrade));
 +        return columns;
 +    }
 +
 +    private static ColumnDefinition 
createColumnFromColumnRow(UntypedResultSet.Row row,
 +                                                              String keyspace,
 +                                                              String table,
 +                                                              AbstractType<?> 
rawComparator,
 +                                                              AbstractType<?> 
rawSubComparator,
 +                                                              boolean isSuper,
 +                                                              boolean 
isCQLTable,
 +                                                              boolean 
isStaticCompactTable,
 +                                                              boolean 
needsUpgrade)
 +    {
 +        ColumnDefinition.Kind kind = deserializeKind(row.getString("type"));
 +        if (needsUpgrade && isStaticCompactTable && kind == 
ColumnDefinition.Kind.REGULAR)
 +            kind = ColumnDefinition.Kind.STATIC;
 +
 +        Integer componentIndex = null;
 +        if (row.has("component_index"))
 +            componentIndex = row.getInt("component_index");
 +
 +        // Note: we save the column name as string, but we should not assume 
that it is an UTF8 name, we
 +        // we need to use the comparator fromString method
 +        AbstractType<?> comparator = isCQLTable
 +                                   ? UTF8Type.instance
 +                                   : 
CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, 
rawSubComparator);
 +        ColumnIdentifier name = 
ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")),
 comparator);
 +
 +        AbstractType<?> validator = parseType(row.getString("validator"));
 +
 +        IndexType indexType = null;
 +        if (row.has("index_type"))
 +            indexType = IndexType.valueOf(row.getString("index_type"));
 +
 +        Map<String, String> indexOptions = null;
 +        if (row.has("index_options"))
 +            indexOptions = fromJsonMap(row.getString("index_options"));
 +
 +        String indexName = null;
 +        if (row.has("index_name"))
 +            indexName = row.getString("index_name");
 +
 +        return new ColumnDefinition(keyspace, table, name, validator, 
indexType, indexOptions, indexName, componentIndex, kind);
 +    }
 +
 +    private static ColumnDefinition.Kind deserializeKind(String kind)
 +    {
 +        if ("clustering_key".equalsIgnoreCase(kind))
 +            return ColumnDefinition.Kind.CLUSTERING_COLUMN;
 +        if ("compact_value".equalsIgnoreCase(kind))
 +            return ColumnDefinition.Kind.REGULAR;
 +        return Enum.valueOf(ColumnDefinition.Kind.class, kind.toUpperCase());
 +    }
 +
 +    private static Triggers createTriggersFromTriggerRows(UntypedResultSet 
rows)
 +    {
 +        Triggers.Builder triggers = 
org.apache.cassandra.schema.Triggers.builder();
 +        rows.forEach(row -> triggers.add(createTriggerFromTriggerRow(row)));
 +        return triggers.build();
 +    }
 +
 +    private static TriggerMetadata 
createTriggerFromTriggerRow(UntypedResultSet.Row row)
 +    {
 +        String name = row.getString("trigger_name");
 +        String classOption = row.getMap("trigger_options", UTF8Type.instance, 
UTF8Type.instance).get("class");
 +        return new TriggerMetadata(name, classOption);
 +    }
 +
 +    /*
 +     * Reading user types
 +     */
 +
 +    private static Collection<Type> readTypes(String keyspaceName)
 +    {
 +        String query = format("SELECT type_name FROM %s.%s WHERE 
keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_USERTYPES);
 +        Collection<String> typeNames = new ArrayList<>();
 +        query(query, keyspaceName).forEach(row -> 
typeNames.add(row.getString("type_name")));
 +
 +        Collection<Type> types = new ArrayList<>();
 +        typeNames.forEach(name -> types.add(readType(keyspaceName, name)));
 +        return types;
 +    }
 +
 +    private static Type readType(String keyspaceName, String typeName)
 +    {
 +        long timestamp = readTypeTimestamp(keyspaceName, typeName);
 +        UserType metadata = readTypeMetadata(keyspaceName, typeName);
 +        return new Type(timestamp, metadata);
 +    }
 +
 +    /*
 +     * Unfortunately there is not a single REGULAR column in 
system.schema_usertypes, so annoyingly we cannot
 +     * use the writeTime() CQL function, and must resort to a lower level.
 +     */
 +    private static long readTypeTimestamp(String keyspaceName, String 
typeName)
 +    {
 +        ColumnFamilyStore store = 
org.apache.cassandra.db.Keyspace.open(SystemKeyspace.NAME)
 +                                                                  
.getColumnFamilyStore(SystemKeyspace.LEGACY_USERTYPES);
 +
 +        ClusteringComparator comparator = store.metadata.comparator;
 +        Slices slices = Slices.with(comparator, Slice.make(comparator, 
typeName));
 +        int nowInSec = FBUtilities.nowInSeconds();
 +        DecoratedKey key = 
StorageService.getPartitioner().decorateKey(AsciiType.instance.fromString(keyspaceName));
 +        SinglePartitionReadCommand command = 
SinglePartitionSliceCommand.create(store.metadata, nowInSec, key, slices);
 +
 +        try (OpOrder.Group op = store.readOrdering.start();
 +             RowIterator partition = 
UnfilteredRowIterators.filter(command.queryMemtableAndDisk(store, op), 
nowInSec))
 +        {
 +            return partition.next().primaryKeyLivenessInfo().timestamp();
 +        }
 +    }
 +
 +    private static UserType readTypeMetadata(String keyspaceName, String 
typeName)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? 
AND type_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_USERTYPES);
 +        UntypedResultSet.Row row = query(query, keyspaceName, typeName).one();
 +
 +        List<ByteBuffer> names =
 +            row.getList("field_names", UTF8Type.instance)
 +               .stream()
 +               .map(ByteBufferUtil::bytes)
 +               .collect(Collectors.toList());
 +
 +        List<AbstractType<?>> types =
 +            row.getList("field_types", UTF8Type.instance)
 +               .stream()
 +               .map(LegacySchemaMigrator::parseType)
 +               .collect(Collectors.toList());
 +
 +        return new UserType(keyspaceName, bytes(typeName), names, types);
 +    }
 +
 +    /*
 +     * Reading UDFs
 +     */
 +
 +    private static Collection<Function> readFunctions(String keyspaceName)
 +    {
 +        String query = format("SELECT function_name, signature FROM %s.%s 
WHERE keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_FUNCTIONS);
 +        HashMultimap<String, List<String>> functionSignatures = 
HashMultimap.create();
 +        query(query, keyspaceName).forEach(row ->
 +        {
 +            functionSignatures.put(row.getString("function_name"), 
row.getList("signature", UTF8Type.instance));
 +        });
 +
 +        Collection<Function> functions = new ArrayList<>();
 +        functionSignatures.entries().forEach(pair -> 
functions.add(readFunction(keyspaceName, pair.getKey(), pair.getValue())));
 +        return functions;
 +    }
 +
 +    private static Function readFunction(String keyspaceName, String 
functionName, List<String> signature)
 +    {
 +        long timestamp = readFunctionTimestamp(keyspaceName, functionName, 
signature);
 +        UDFunction metadata = readFunctionMetadata(keyspaceName, 
functionName, signature);
 +        return new Function(timestamp, metadata);
 +    }
 +
 +    private static long readFunctionTimestamp(String keyspaceName, String 
functionName, List<String> signature)
 +    {
 +        String query = format("SELECT writeTime(return_type) AS timestamp " +
 +                              "FROM %s.%s " +
 +                              "WHERE keyspace_name = ? AND function_name = ? 
AND signature = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_FUNCTIONS);
 +        return query(query, keyspaceName, functionName, 
signature).one().getLong("timestamp");
 +    }
 +
 +    private static UDFunction readFunctionMetadata(String keyspaceName, 
String functionName, List<String> signature)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? 
AND function_name = ? AND signature = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_FUNCTIONS);
 +        UntypedResultSet.Row row = query(query, keyspaceName, functionName, 
signature).one();
 +
 +        FunctionName name = new FunctionName(keyspaceName, functionName);
 +
 +        List<ColumnIdentifier> argNames = new ArrayList<>();
 +        if (row.has("argument_names"))
 +            for (String arg : row.getList("argument_names", 
UTF8Type.instance))
 +                argNames.add(new ColumnIdentifier(arg, true));
 +
 +        List<AbstractType<?>> argTypes = new ArrayList<>();
 +        if (row.has("argument_types"))
 +            for (String type : row.getList("argument_types", 
UTF8Type.instance))
 +                argTypes.add(parseType(type));
 +
 +        AbstractType<?> returnType = parseType(row.getString("return_type"));
 +
 +        String language = row.getString("language");
 +        String body = row.getString("body");
 +        boolean calledOnNullInput = row.getBoolean("called_on_null_input");
 +
 +        try
 +        {
 +            return UDFunction.create(name, argNames, argTypes, returnType, 
calledOnNullInput, language, body);
 +        }
 +        catch (InvalidRequestException e)
 +        {
 +            return UDFunction.createBrokenFunction(name, argNames, argTypes, 
returnType, calledOnNullInput, language, body, e);
 +        }
 +    }
 +
 +    /*
 +     * Reading UDAs
 +     */
 +
 +    private static Collection<Aggregate> readAggregates(String keyspaceName)
 +    {
 +        String query = format("SELECT aggregate_name, signature FROM %s.%s 
WHERE keyspace_name = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_AGGREGATES);
 +        HashMultimap<String, List<String>> aggregateSignatures = 
HashMultimap.create();
 +        query(query, keyspaceName).forEach(row ->
 +        {
 +            aggregateSignatures.put(row.getString("aggregate_name"), 
row.getList("signature", UTF8Type.instance));
 +        });
 +
 +        Collection<Aggregate> aggregates = new ArrayList<>();
 +        aggregateSignatures.entries().forEach(pair -> 
aggregates.add(readAggregate(keyspaceName, pair.getKey(), pair.getValue())));
 +        return aggregates;
 +    }
 +
 +    private static Aggregate readAggregate(String keyspaceName, String 
aggregateName, List<String> signature)
 +    {
 +        long timestamp = readAggregateTimestamp(keyspaceName, aggregateName, 
signature);
 +        UDAggregate metadata = readAggregateMetadata(keyspaceName, 
aggregateName, signature);
 +        return new Aggregate(timestamp, metadata);
 +    }
 +
 +    private static long readAggregateTimestamp(String keyspaceName, String 
aggregateName, List<String> signature)
 +    {
 +        String query = format("SELECT writeTime(return_type) AS timestamp " +
 +                              "FROM %s.%s " +
 +                              "WHERE keyspace_name = ? AND aggregate_name = ? 
AND signature = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_AGGREGATES);
 +        return query(query, keyspaceName, aggregateName, 
signature).one().getLong("timestamp");
 +    }
 +
 +    private static UDAggregate readAggregateMetadata(String keyspaceName, 
String functionName, List<String> signature)
 +    {
 +        String query = format("SELECT * FROM %s.%s WHERE keyspace_name = ? 
AND function_name = ? AND signature = ?",
 +                              SystemKeyspace.NAME,
 +                              SystemKeyspace.LEGACY_AGGREGATES);
 +        UntypedResultSet.Row row = query(query, keyspaceName, functionName, 
signature).one();
 +
 +        FunctionName name = new FunctionName(keyspaceName, functionName);
 +
 +        List<String> types = row.getList("argument_types", UTF8Type.instance);
 +
 +        List<AbstractType<?>> argTypes = new ArrayList<>();
 +        if (types != null)
 +        {
 +            argTypes = new ArrayList<>(types.size());
 +            for (String type : types)
 +                argTypes.add(parseType(type));
 +        }
 +
 +        AbstractType<?> returnType = parseType(row.getString("return_type"));
 +
-         FunctionName stateFunc = parseAggregateFunctionName(keyspaceName, 
row.getString("state_func"));
-         FunctionName finalFunc = row.has("final_func") ? 
parseAggregateFunctionName(keyspaceName, row.getString("final_func")) : null;
++        FunctionName stateFunc = new FunctionName(keyspaceName, 
row.getString("state_func"));
++        FunctionName finalFunc = row.has("final_func") ? new 
FunctionName(keyspaceName, row.getString("final_func")) : null;
 +        AbstractType<?> stateType = row.has("state_type") ? 
parseType(row.getString("state_type")) : null;
 +        ByteBuffer initcond = row.has("initcond") ? row.getBytes("initcond") 
: null;
 +
 +        try
 +        {
 +            return UDAggregate.create(name, argTypes, returnType, stateFunc, 
finalFunc, stateType, initcond);
 +        }
 +        catch (InvalidRequestException reason)
 +        {
 +            return UDAggregate.createBroken(name, argTypes, returnType, 
initcond, reason);
 +        }
 +    }
 +
-     private static FunctionName parseAggregateFunctionName(String ksName, 
String func)
-     {
-         int i = func.indexOf('.');
- 
-         // function name can be abbreviated (pre 2.2rc2) - it is in the same 
keyspace as the aggregate
-         if (i == -1)
-             return new FunctionName(ksName, func);
- 
-         String ks = func.substring(0, i);
-         String f = func.substring(i + 1);
- 
-         // only aggregate's function keyspace and system keyspace are allowed
-         assert ks.equals(ksName) || ks.equals(SystemKeyspace.NAME);
- 
-         return new FunctionName(ks, f);
-     }
- 
 +    private static UntypedResultSet query(String query, Object... values)
 +    {
 +        return QueryProcessor.executeOnceInternal(query, values);
 +    }
 +
 +    private static AbstractType<?> parseType(String str)
 +    {
 +        return TypeParser.parse(str);
 +    }
 +
 +    private static final class Keyspace
 +    {
 +        final long timestamp;
 +        final String name;
 +        final KeyspaceParams params;
 +        final Collection<Table> tables;
 +        final Collection<Type> types;
 +        final Collection<Function> functions;
 +        final Collection<Aggregate> aggregates;
 +
 +        Keyspace(long timestamp,
 +                 String name,
 +                 KeyspaceParams params,
 +                 Collection<Table> tables,
 +                 Collection<Type> types,
 +                 Collection<Function> functions,
 +                 Collection<Aggregate> aggregates)
 +        {
 +            this.timestamp = timestamp;
 +            this.name = name;
 +            this.params = params;
 +            this.tables = tables;
 +            this.types = types;
 +            this.functions = functions;
 +            this.aggregates = aggregates;
 +        }
 +    }
 +
 +    private static final class Table
 +    {
 +        final long timestamp;
 +        final CFMetaData metadata;
 +
 +        Table(long timestamp, CFMetaData metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +
 +    private static final class Type
 +    {
 +        final long timestamp;
 +        final UserType metadata;
 +
 +        Type(long timestamp, UserType metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +
 +    private static final class Function
 +    {
 +        final long timestamp;
 +        final UDFunction metadata;
 +
 +        Function(long timestamp, UDFunction metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +
 +    private static final class Aggregate
 +    {
 +        final long timestamp;
 +        final UDAggregate metadata;
 +
 +        Aggregate(long timestamp, UDAggregate metadata)
 +        {
 +            this.timestamp = timestamp;
 +            this.metadata = metadata;
 +        }
 +    }
 +}

Reply via email to