http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 3e8b0a2..503dd7f 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -26,27 +26,20 @@ import java.util.concurrent.TimeUnit; import javax.management.openmbean.*; import com.google.common.base.Function; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; +import com.google.common.collect.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; -import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -54,6 +47,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; @@ -70,155 +64,21 @@ public final class SystemKeyspace public static final String NAME = "system"; - public static final String SCHEMA_KEYSPACES_TABLE = "schema_keyspaces"; - public static final String SCHEMA_COLUMNFAMILIES_TABLE = "schema_columnfamilies"; - public static final String SCHEMA_COLUMNS_TABLE = "schema_columns"; - public static final String SCHEMA_TRIGGERS_TABLE = "schema_triggers"; - public static final String SCHEMA_USER_TYPES_TABLE = "schema_usertypes"; - public static final String SCHEMA_FUNCTIONS_TABLE = "schema_functions"; - public static final String SCHEMA_AGGREGATES_TABLE = "schema_aggregates"; - - public static final String BUILT_INDEXES_TABLE = "IndexInfo"; - public static final String HINTS_TABLE = "hints"; - public static final String BATCHLOG_TABLE = "batchlog"; - public static final String PAXOS_TABLE = "paxos"; - public static final String LOCAL_TABLE = "local"; - public static final String PEERS_TABLE = "peers"; - public static final String PEER_EVENTS_TABLE = "peer_events"; - public static final String RANGE_XFERS_TABLE = "range_xfers"; - public static final String COMPACTION_LOG_TABLE = "compactions_in_progress"; - public static final String COMPACTION_HISTORY_TABLE = "compaction_history"; - public static final String SSTABLE_ACTIVITY_TABLE = "sstable_activity"; - - public static final List<String> ALL_SCHEMA_TABLES = - Arrays.asList(SCHEMA_KEYSPACES_TABLE, - SCHEMA_COLUMNFAMILIES_TABLE, - SCHEMA_COLUMNS_TABLE, - SCHEMA_TRIGGERS_TABLE, - SCHEMA_USER_TYPES_TABLE, - SCHEMA_FUNCTIONS_TABLE, - SCHEMA_AGGREGATES_TABLE); - - private static int WEEK = (int) TimeUnit.DAYS.toSeconds(7); - - public static final CFMetaData SchemaKeyspacesTable = - compile(SCHEMA_KEYSPACES_TABLE, "keyspace definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "durable_writes boolean," - + "strategy_class text," - + "strategy_options text," - + "PRIMARY KEY ((keyspace_name))) " - + "WITH COMPACT STORAGE") - .gcGraceSeconds(WEEK); - - public static final CFMetaData SchemaColumnFamiliesTable = - compile(SCHEMA_COLUMNFAMILIES_TABLE, "table definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "bloom_filter_fp_chance double," - + "caching text," - + "cf_id uuid," // post-2.1 UUID cfid - + "comment text," - + "compaction_strategy_class text," - + "compaction_strategy_options text," - + "comparator text," - + "compression_parameters text," - + "default_time_to_live int," - + "default_validator text," - + "dropped_columns map<text, bigint>," - + "gc_grace_seconds int," - + "is_dense boolean," - + "key_validator text," - + "local_read_repair_chance double," - + "max_compaction_threshold int," - + "max_index_interval int," - + "memtable_flush_period_in_ms int," - + "min_compaction_threshold int," - + "min_index_interval int," - + "read_repair_chance double," - + "speculative_retry text," - + "subcomparator text," - + "type text," - + "PRIMARY KEY ((keyspace_name), columnfamily_name))") - .gcGraceSeconds(WEEK); - - public static final CFMetaData SchemaColumnsTable = - compile(SCHEMA_COLUMNS_TABLE, "column definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "column_name text," - + "component_index int," - + "index_name text," - + "index_options text," - + "index_type text," - + "type text," - + "validator text," - + "PRIMARY KEY ((keyspace_name), columnfamily_name, column_name))") - .gcGraceSeconds(WEEK); - - public static final CFMetaData SchemaTriggersTable = - compile(SCHEMA_TRIGGERS_TABLE, "trigger definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "columnfamily_name text," - + "trigger_name text," - + "trigger_options map<text, text>," - + "PRIMARY KEY ((keyspace_name), columnfamily_name, trigger_name))") - .gcGraceSeconds(WEEK); - - public static final CFMetaData SchemaUserTypesTable = - compile(SCHEMA_USER_TYPES_TABLE, "user defined type definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "type_name text," - + "field_names list<text>," - + "field_types list<text>," - + "PRIMARY KEY ((keyspace_name), type_name))") - .gcGraceSeconds(WEEK); - - public static final CFMetaData SchemaFunctionsTable = - compile(SCHEMA_FUNCTIONS_TABLE, "user defined function definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "function_name text," - + "signature blob," - + "argument_names list<text>," - + "argument_types list<text>," - + "body text," - + "deterministic boolean," - + "language text," - + "return_type text," - + "PRIMARY KEY ((keyspace_name), function_name, signature))") - .gcGraceSeconds(WEEK); - - public static final CFMetaData SchemaAggregatesTable = - compile(SCHEMA_AGGREGATES_TABLE, "user defined aggregate definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "aggregate_name text," - + "signature blob," - + "argument_types list<text>," - + "return_type text," - + "state_func text," - + "state_type text," - + "final_func text," - + "initcond blob," - + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))") - .gcGraceSeconds(WEEK); - - public static final CFMetaData BuiltIndexesTable = - compile(BUILT_INDEXES_TABLE, "built column indexes", - "CREATE TABLE \"%s\" (" - + "table_name text," - + "index_name text," - + "PRIMARY KEY ((table_name), index_name)) " - + "WITH COMPACT STORAGE"); - - public static final CFMetaData HintsTable = - compile(HINTS_TABLE, "hints awaiting delivery", + public static final String HINTS = "hints"; + public static final String BATCHLOG = "batchlog"; + public static final String PAXOS = "paxos"; + public static final String BUILT_INDEXES = "IndexInfo"; + public static final String LOCAL = "local"; + public static final String PEERS = "peers"; + public static final String PEER_EVENTS = "peer_events"; + public static final String RANGE_XFERS = "range_xfers"; + public static final String COMPACTIONS_IN_PROGRESS = "compactions_in_progress"; + public static final String COMPACTION_HISTORY = "compaction_history"; + public static final String SSTABLE_ACTIVITY = "sstable_activity"; + + public static final CFMetaData Hints = + compile(HINTS, + "hints awaiting delivery", "CREATE TABLE %s (" + "target_id uuid," + "hint_id timeuuid," @@ -229,8 +89,9 @@ public final class SystemKeyspace .compactionStrategyOptions(Collections.singletonMap("enabled", "false")) .gcGraceSeconds(0); - public static final CFMetaData BatchlogTable = - compile(BATCHLOG_TABLE, "batches awaiting replay", + public static final CFMetaData Batchlog = + compile(BATCHLOG, + "batches awaiting replay", "CREATE TABLE %s (" + "id uuid," + "data blob," @@ -240,8 +101,9 @@ public final class SystemKeyspace .compactionStrategyOptions(Collections.singletonMap("min_threshold", "2")) .gcGraceSeconds(0); - private static final CFMetaData PaxosTable = - compile(PAXOS_TABLE, "in-progress paxos proposals", + private static final CFMetaData Paxos = + compile(PAXOS, + "in-progress paxos proposals", "CREATE TABLE %s (" + "row_key blob," + "cf_id UUID," @@ -253,8 +115,19 @@ public final class SystemKeyspace + "PRIMARY KEY ((row_key), cf_id))") .compactionStrategyClass(LeveledCompactionStrategy.class); - private static final CFMetaData LocalTable = - compile(LOCAL_TABLE, "information about the local node", + // TODO: make private + public static final CFMetaData BuiltIndexes = + compile(BUILT_INDEXES, + "built column indexes", + "CREATE TABLE \"%s\" (" + + "table_name text," + + "index_name text," + + "PRIMARY KEY ((table_name), index_name)) " + + "WITH COMPACT STORAGE"); + + private static final CFMetaData Local = + compile(LOCAL, + "information about the local node", "CREATE TABLE %s (" + "key text," + "bootstrapped text," @@ -273,8 +146,9 @@ public final class SystemKeyspace + "truncated_at map<uuid, blob>," + "PRIMARY KEY ((key)))"); - private static final CFMetaData PeersTable = - compile(PEERS_TABLE, "information about known peers in the cluster", + private static final CFMetaData Peers = + compile(PEERS, + "information about known peers in the cluster", "CREATE TABLE %s (" + "peer inet," + "data_center text," @@ -287,22 +161,25 @@ public final class SystemKeyspace + "tokens set<varchar>," + "PRIMARY KEY ((peer)))"); - private static final CFMetaData PeerEventsTable = - compile(PEER_EVENTS_TABLE, "events related to peers", + private static final CFMetaData PeerEvents = + compile(PEER_EVENTS, + "events related to peers", "CREATE TABLE %s (" + "peer inet," + "hints_dropped map<uuid, int>," + "PRIMARY KEY ((peer)))"); - private static final CFMetaData RangeXfersTable = - compile(RANGE_XFERS_TABLE, "ranges requested for transfer", + private static final CFMetaData RangeXfers = + compile(RANGE_XFERS, + "ranges requested for transfer", "CREATE TABLE %s (" + "token_bytes blob," + "requested_at timestamp," + "PRIMARY KEY ((token_bytes)))"); - private static final CFMetaData CompactionLogTable = - compile(COMPACTION_LOG_TABLE, "unfinished compactions", + private static final CFMetaData CompactionsInProgress = + compile(COMPACTIONS_IN_PROGRESS, + "unfinished compactions", "CREATE TABLE %s (" + "id uuid," + "columnfamily_name text," @@ -310,8 +187,9 @@ public final class SystemKeyspace + "keyspace_name text," + "PRIMARY KEY ((id)))"); - private static final CFMetaData CompactionHistoryTable = - compile(COMPACTION_HISTORY_TABLE, "week-long compaction history", + private static final CFMetaData CompactionHistory = + compile(COMPACTION_HISTORY, + "week-long compaction history", "CREATE TABLE %s (" + "id uuid," + "bytes_in bigint," @@ -321,10 +199,11 @@ public final class SystemKeyspace + "keyspace_name text," + "rows_merged map<int, bigint>," + "PRIMARY KEY ((id)))") - .defaultTimeToLive(WEEK); + .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)); - private static final CFMetaData SSTableActivityTable = - compile(SSTABLE_ACTIVITY_TABLE, "historic sstable read rates", + private static final CFMetaData SSTableActivity = + compile(SSTABLE_ACTIVITY, + "historic sstable read rates", "CREATE TABLE %s (" + "keyspace_name text," + "columnfamily_name text," @@ -333,37 +212,29 @@ public final class SystemKeyspace + "rate_15m double," + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))"); - private static CFMetaData compile(String table, String comment, String cql) + private static CFMetaData compile(String name, String description, String schema) { - return CFMetaData.compile(String.format(cql, table), NAME).comment(comment); + return CFMetaData.compile(String.format(schema, name), NAME).comment(description); } public static KSMetaData definition() { - List<CFMetaData> tables = - Arrays.asList(SchemaKeyspacesTable, - SchemaColumnFamiliesTable, - SchemaColumnsTable, - SchemaTriggersTable, - SchemaUserTypesTable, - SchemaFunctionsTable, - SchemaAggregatesTable, - BuiltIndexesTable, - HintsTable, - BatchlogTable, - PaxosTable, - LocalTable, - PeersTable, - PeerEventsTable, - RangeXfersTable, - CompactionLogTable, - CompactionHistoryTable, - SSTableActivityTable); + Iterable<CFMetaData> tables = + Iterables.concat(LegacySchemaTables.All, + Arrays.asList(BuiltIndexes, + Hints, + Batchlog, + Paxos, + Local, + Peers, + PeerEvents, + RangeXfers, + CompactionsInProgress, + CompactionHistory, + SSTableActivity)); return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables); } - private static final String LOCAL_KEY = "local"; - private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords; public enum BootstrapState @@ -381,24 +252,15 @@ public final class SystemKeyspace public static void finishStartup() { setupVersion(); - - // add entries to system schema columnfamilies for the hardcoded system definitions - KSMetaData ksmd = Schema.instance.getKSMetaData(NAME); - - // delete old, possibly obsolete entries in schema tables - for (String table : ALL_SCHEMA_TABLES) - executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", table), ksmd.name); - - // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added) - ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply(); + LegacySchemaTables.saveSystemKeyspaceSchema(); } private static void setupVersion() { String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"; IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - executeOnceInternal(String.format(req, LOCAL_TABLE), - LOCAL_KEY, + executeOnceInternal(String.format(req, LOCAL), + LOCAL, FBUtilities.getReleaseVersionString(), QueryProcessor.CQL_VERSION.toString(), cassandraConstants.VERSION, @@ -429,8 +291,8 @@ public final class SystemKeyspace } }); String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTION_LOG_TABLE), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); - forceBlockingFlush(COMPACTION_LOG_TABLE); + executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); + forceBlockingFlush(COMPACTIONS_IN_PROGRESS); return compactionId; } @@ -443,8 +305,8 @@ public final class SystemKeyspace { assert taskId != null; - executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTION_LOG_TABLE), taskId); - forceBlockingFlush(COMPACTION_LOG_TABLE); + executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId); + forceBlockingFlush(COMPACTIONS_IN_PROGRESS); } /** @@ -454,7 +316,7 @@ public final class SystemKeyspace public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions() { String req = "SELECT * FROM system.%s"; - UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTION_LOG_TABLE)); + UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS)); Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>(); for (UntypedResultSet.Row row : resultSet) @@ -479,7 +341,7 @@ public final class SystemKeyspace public static void discardCompactionsInProgress() { - ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTION_LOG_TABLE); + ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS); compactionLog.truncateBlocking(); } @@ -491,24 +353,24 @@ public final class SystemKeyspace Map<Integer, Long> rowsMerged) { // don't write anything when the history table itself is compacted, since that would in turn cause new compactions - if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY_TABLE)) + if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY)) return; String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTION_HISTORY_TABLE), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged); + executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged); } public static TabularData getCompactionHistory() throws OpenDataException { - UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY_TABLE)); + UntypedResultSet queryResultSet = executeInternal(String.format("SELECT * from system.%s", COMPACTION_HISTORY)); return CompactionHistoryTabularData.from(queryResultSet); } public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), truncationAsMapEntry(cfs, truncatedAt, position)); + executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); truncationRecords = null; - forceBlockingFlush(LOCAL_TABLE); + forceBlockingFlush(LOCAL); } /** @@ -517,9 +379,9 @@ public final class SystemKeyspace public static synchronized void removeTruncationRecord(UUID cfId) { String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), cfId); + executeInternal(String.format(req, LOCAL, LOCAL), cfId); truncationRecords = null; - forceBlockingFlush(LOCAL_TABLE); + forceBlockingFlush(LOCAL); } private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) @@ -558,7 +420,7 @@ public final class SystemKeyspace private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords() { - UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL_TABLE, LOCAL_KEY)); + UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>(); @@ -597,14 +459,14 @@ public final class SystemKeyspace } String req = "INSERT INTO system.%s (peer, tokens) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS_TABLE), ep, tokensAsSet(tokens)); + executeInternal(String.format(req, PEERS), ep, tokensAsSet(tokens)); } public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip) { String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS_TABLE), ep, preferred_ip); - forceBlockingFlush(PEERS_TABLE); + executeInternal(String.format(req, PEERS), ep, preferred_ip); + forceBlockingFlush(PEERS); } public static synchronized void updatePeerInfo(InetAddress ep, String columnName, Object value) @@ -613,20 +475,20 @@ public final class SystemKeyspace return; String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)"; - executeInternal(String.format(req, PEERS_TABLE, columnName), ep, value); + executeInternal(String.format(req, PEERS, columnName), ep, value); } public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) { // with 30 day TTL String req = "UPDATE system.%s USING TTL 2592000 SET hints_dropped[ ? ] = ? WHERE peer = ?"; - executeInternal(String.format(req, PEER_EVENTS_TABLE), timePeriod, value, ep); + executeInternal(String.format(req, PEER_EVENTS), timePeriod, value, ep); } public static synchronized void updateSchemaVersion(UUID version) { String req = "INSERT INTO system.%s (key, schema_version) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), version); + executeInternal(String.format(req, LOCAL, LOCAL), version); } private static Set<String> tokensAsSet(Collection<Token> tokens) @@ -653,7 +515,7 @@ public final class SystemKeyspace public static synchronized void removeEndpoint(InetAddress ep) { String req = "DELETE FROM system.%s WHERE peer = ?"; - executeInternal(String.format(req, PEERS_TABLE), ep); + executeInternal(String.format(req, PEERS), ep); } /** @@ -663,8 +525,8 @@ public final class SystemKeyspace { assert !tokens.isEmpty() : "removeEndpoint should be used instead"; String req = "INSERT INTO system.%s (key, tokens) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), tokensAsSet(tokens)); - forceBlockingFlush(LOCAL_TABLE); + executeInternal(String.format(req, LOCAL, LOCAL), tokensAsSet(tokens)); + forceBlockingFlush(LOCAL); } /** @@ -696,7 +558,7 @@ public final class SystemKeyspace public static SetMultimap<InetAddress, Token> loadTokens() { SetMultimap<InetAddress, Token> tokenMap = HashMultimap.create(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS_TABLE)) + for (UntypedResultSet.Row row : executeInternal("SELECT peer, tokens FROM system." + PEERS)) { InetAddress peer = row.getInetAddress("peer"); if (row.has("tokens")) @@ -713,7 +575,7 @@ public final class SystemKeyspace public static Map<InetAddress, UUID> loadHostIds() { Map<InetAddress, UUID> hostIdMap = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS_TABLE)) + for (UntypedResultSet.Row row : executeInternal("SELECT peer, host_id FROM system." + PEERS)) { InetAddress peer = row.getInetAddress("peer"); if (row.has("host_id")) @@ -733,7 +595,7 @@ public final class SystemKeyspace public static InetAddress getPreferredIP(InetAddress ep) { String req = "SELECT preferred_ip FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(String.format(req, PEERS_TABLE), ep); + UntypedResultSet result = executeInternal(String.format(req, PEERS), ep); if (!result.isEmpty() && result.one().has("preferred_ip")) return result.one().getInetAddress("preferred_ip"); return ep; @@ -745,7 +607,7 @@ public final class SystemKeyspace public static Map<InetAddress, Map<String,String>> loadDcRackInfo() { Map<InetAddress, Map<String, String>> result = new HashMap<>(); - for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS_TABLE)) + for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS)) { InetAddress peer = row.getInetAddress("peer"); if (row.has("data_center") && row.has("rack")) @@ -780,10 +642,10 @@ public final class SystemKeyspace ex.initCause(err); throw ex; } - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL_TABLE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(LOCAL); String req = "SELECT cluster_name FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY)); + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); if (result.isEmpty() || !result.one().has("cluster_name")) { @@ -793,7 +655,7 @@ public final class SystemKeyspace // no system files. this is a new node. req = "INSERT INTO system.%s (key, cluster_name) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), DatabaseDescriptor.getClusterName()); + executeInternal(String.format(req, LOCAL, LOCAL), DatabaseDescriptor.getClusterName()); return; } @@ -805,7 +667,7 @@ public final class SystemKeyspace public static Collection<Token> getSavedTokens() { String req = "SELECT tokens FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY)); + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); return result.isEmpty() || !result.one().has("tokens") ? Collections.<Token>emptyList() : deserializeTokens(result.one().getSet("tokens", UTF8Type.instance)); @@ -814,7 +676,7 @@ public final class SystemKeyspace public static int incrementAndGetGeneration() { String req = "SELECT gossip_generation FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY)); + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); int generation; if (result.isEmpty() || !result.one().has("gossip_generation")) @@ -842,8 +704,8 @@ public final class SystemKeyspace } req = "INSERT INTO system.%s (key, gossip_generation) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), generation); - forceBlockingFlush(LOCAL_TABLE); + executeInternal(String.format(req, LOCAL, LOCAL), generation); + forceBlockingFlush(LOCAL); return generation; } @@ -851,7 +713,7 @@ public final class SystemKeyspace public static BootstrapState getBootstrapState() { String req = "SELECT bootstrapped FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY)); + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); if (result.isEmpty() || !result.one().has("bootstrapped")) return BootstrapState.NEEDS_BOOTSTRAP; @@ -872,15 +734,15 @@ public final class SystemKeyspace public static void setBootstrapState(BootstrapState state) { String req = "INSERT INTO system.%s (key, bootstrapped) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), state.name()); - forceBlockingFlush(LOCAL_TABLE); + executeInternal(String.format(req, LOCAL, LOCAL), state.name()); + forceBlockingFlush(LOCAL); } public static boolean isIndexBuilt(String keyspaceName, String indexName) { - ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES_TABLE); + ColumnFamilyStore cfs = Keyspace.open(NAME).getColumnFamilyStore(BUILT_INDEXES); QueryFilter filter = QueryFilter.getNamesFilter(decorate(ByteBufferUtil.bytes(keyspaceName)), - BUILT_INDEXES_TABLE, + BUILT_INDEXES, FBUtilities.singleton(cfs.getComparator().makeCellName(indexName), cfs.getComparator()), System.currentTimeMillis()); return ColumnFamilyStore.removeDeleted(cfs.getColumnFamily(filter), Integer.MAX_VALUE) != null; @@ -888,7 +750,7 @@ public final class SystemKeyspace public static void setIndexBuilt(String keyspaceName, String indexName) { - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES_TABLE); + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(NAME, BUILT_INDEXES); cf.addColumn(new BufferCell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros())); new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName), cf).apply(); } @@ -896,7 +758,7 @@ public final class SystemKeyspace public static void setIndexRemoved(String keyspaceName, String indexName) { Mutation mutation = new Mutation(NAME, ByteBufferUtil.bytes(keyspaceName)); - mutation.delete(BUILT_INDEXES_TABLE, BuiltIndexesTable.comparator.makeCellName(indexName), FBUtilities.timestampMicros()); + mutation.delete(BUILT_INDEXES, BuiltIndexes.comparator.makeCellName(indexName), FBUtilities.timestampMicros()); mutation.apply(); } @@ -907,7 +769,7 @@ public final class SystemKeyspace public static UUID getLocalHostId() { String req = "SELECT host_id FROM system.%s WHERE key='%s'"; - UntypedResultSet result = executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY)); + UntypedResultSet result = executeInternal(String.format(req, LOCAL, LOCAL)); // Look up the Host UUID (return it if found) if (!result.isEmpty() && result.one().has("host_id")) @@ -925,144 +787,14 @@ public final class SystemKeyspace public static UUID setLocalHostId(UUID hostId) { String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', ?)"; - executeInternal(String.format(req, LOCAL_TABLE, LOCAL_KEY), hostId); + executeInternal(String.format(req, LOCAL, LOCAL), hostId); return hostId; } - /** - * @param cfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns) - * @return CFS responsible to hold low-level serialized schema - */ - public static ColumnFamilyStore schemaCFS(String cfName) - { - return Keyspace.open(NAME).getColumnFamilyStore(cfName); - } - - public static List<Row> serializedSchema() - { - List<Row> schema = new ArrayList<>(); - - for (String cf : ALL_SCHEMA_TABLES) - schema.addAll(serializedSchema(cf)); - - return schema; - } - - /** - * @param schemaCfName The name of the ColumnFamily responsible for part of the schema (keyspace, ColumnFamily, columns) - * @return low-level schema representation (each row represents individual Keyspace or ColumnFamily) - */ - public static List<Row> serializedSchema(String schemaCfName) - { - Token minToken = StorageService.getPartitioner().getMinimumToken(); - - return schemaCFS(schemaCfName).getRangeSlice(new Range<RowPosition>(minToken.minKeyBound(), minToken.maxKeyBound()), - null, - new IdentityQueryFilter(), - Integer.MAX_VALUE, - System.currentTimeMillis()); - } - - public static Collection<Mutation> serializeSchema() - { - Map<DecoratedKey, Mutation> mutationMap = new HashMap<>(); - - for (String cf : ALL_SCHEMA_TABLES) - serializeSchema(mutationMap, cf); - - return mutationMap.values(); - } - - private static void serializeSchema(Map<DecoratedKey, Mutation> mutationMap, String schemaCfName) - { - for (Row schemaRow : serializedSchema(schemaCfName)) - { - if (Schema.ignoredSchemaRow(schemaRow)) - continue; - - Mutation mutation = mutationMap.get(schemaRow.key); - if (mutation == null) - { - mutation = new Mutation(NAME, schemaRow.key.getKey()); - mutationMap.put(schemaRow.key, mutation); - } - - mutation.add(schemaRow.cf); - } - } - - public static Map<DecoratedKey, ColumnFamily> getSchema(String cfName) - { - Map<DecoratedKey, ColumnFamily> schema = new HashMap<>(); - - for (Row schemaEntity : SystemKeyspace.serializedSchema(cfName)) - schema.put(schemaEntity.key, schemaEntity.cf); - - return schema; - } - - public static Map<DecoratedKey, ColumnFamily> getSchema(String schemaCfName, Set<String> keyspaces) - { - Map<DecoratedKey, ColumnFamily> schema = new HashMap<>(); - - for (String keyspace : keyspaces) - { - Row schemaEntity = readSchemaRow(schemaCfName, keyspace); - if (schemaEntity.cf != null) - schema.put(schemaEntity.key, schemaEntity.cf); - } - - return schema; - } - - public static ByteBuffer getSchemaKSKey(String ksName) - { - return AsciiType.instance.fromString(ksName); - } - - /** - * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace. - * - * @param schemaCfName the schema table to get the data from (schema_keyspaces, schema_columnfamilies, schema_columns or schema_triggers) - * @param ksName the keyspace of the tables we are interested in - * @return a Row containing the schema data of a particular type for the keyspace - */ - public static Row readSchemaRow(String schemaCfName, String ksName) - { - DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName)); - - ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName); - ColumnFamily result = schemaCFS.getColumnFamily(QueryFilter.getIdentityFilter(key, schemaCfName, System.currentTimeMillis())); - - return new Row(key, result); - } - - /** - * Fetches a subset of schema (table data, columns metadata or triggers) for the keyspace+table pair. - * - * @param schemaCfName the schema table to get the data from (schema_columnfamilies, schema_columns or schema_triggers) - * @param ksName the keyspace of the table we are interested in - * @param cfName the table we are interested in - * @return a Row containing the schema data of a particular type for the table - */ - public static Row readSchemaRow(String schemaCfName, String ksName, String cfName) - { - DecoratedKey key = StorageService.getPartitioner().decorateKey(getSchemaKSKey(ksName)); - ColumnFamilyStore schemaCFS = SystemKeyspace.schemaCFS(schemaCfName); - Composite prefix = schemaCFS.getComparator().make(cfName); - ColumnFamily cf = schemaCFS.getColumnFamily(key, - prefix, - prefix.end(), - false, - Integer.MAX_VALUE, - System.currentTimeMillis()); - return new Row(key, cf); - } - public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS_TABLE), key, metadata.cfId); + UntypedResultSet results = executeInternal(String.format(req, PAXOS), key, metadata.cfId); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); @@ -1083,7 +815,7 @@ public final class SystemKeyspace public static void savePaxosPromise(Commit promise) { String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(req, PAXOS_TABLE), + executeInternal(String.format(req, PAXOS), UUIDGen.microsTimestamp(promise.ballot), paxosTtl(promise.update.metadata), promise.ballot, @@ -1093,7 +825,7 @@ public final class SystemKeyspace public static void savePaxosProposal(Commit proposal) { - executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS_TABLE), + executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ? WHERE row_key = ? AND cf_id = ?", PAXOS), UUIDGen.microsTimestamp(proposal.ballot), paxosTtl(proposal.update.metadata), proposal.ballot, @@ -1113,7 +845,7 @@ public final class SystemKeyspace // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old) // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc. String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ? WHERE row_key = ? AND cf_id = ?"; - executeInternal(String.format(cql, PAXOS_TABLE), + executeInternal(String.format(cql, PAXOS), UUIDGen.microsTimestamp(commit.ballot), paxosTtl(commit.update.metadata), commit.ballot, @@ -1132,7 +864,7 @@ public final class SystemKeyspace public static RestorableMeter getSSTableReadMeter(String keyspace, String table, int generation) { String cql = "SELECT * FROM system.%s WHERE keyspace_name=? and columnfamily_name=? and generation=?"; - UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE), keyspace, table, generation); + UntypedResultSet results = executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); if (results.isEmpty()) return new RestorableMeter(); @@ -1150,7 +882,7 @@ public final class SystemKeyspace { // Store values with a one-day TTL to handle corner cases where cleanup might not occur String cql = "INSERT INTO system.%s (keyspace_name, columnfamily_name, generation, rate_15m, rate_120m) VALUES (?, ?, ?, ?, ?) USING TTL 864000"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE), + executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation, @@ -1164,6 +896,6 @@ public final class SystemKeyspace public static void clearSSTableReadMeter(String keyspace, String table, int generation) { String cql = "DELETE FROM system.%s WHERE keyspace_name=? AND columnfamily_name=? and generation=?"; - executeInternal(String.format(cql, SSTABLE_ACTIVITY_TABLE), keyspace, table, generation); + executeInternal(String.format(cql, SSTABLE_ACTIVITY), keyspace, table, generation); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java index 0b52904..8be9a18 100644 --- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java @@ -24,17 +24,19 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.*; -import org.apache.cassandra.db.BufferCell; -import org.apache.cassandra.db.Cell; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.BufferCell; +import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -228,17 +230,15 @@ public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap { partitioner = FBUtilities.newPartitioner(client.describe_partitioner()); // get CF meta data - String query = "SELECT comparator," + - " subcomparator," + - " type " + - "FROM system.schema_columnfamilies " + - "WHERE keyspace_name = '%s' " + - " AND columnfamily_name = '%s' "; - - CqlResult result = client.execute_cql3_query( - ByteBufferUtil.bytes(String.format(query, keyspace, cfName)), - Compression.NONE, - ConsistencyLevel.ONE); + String query = String.format("SELECT comparator, subcomparator, type " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNFAMILIES, + keyspace, + cfName); + + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); Iterator<CqlRow> iteraRow = result.rows.iterator(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java index 21e30e2..ffaaea9 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java @@ -22,17 +22,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -40,10 +30,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; - import org.apache.commons.lang3.StringUtils; - -import org.apache.cassandra.hadoop.HadoopCompat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +42,8 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.TupleValue; import com.datastax.driver.core.UDTValue; +import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.hadoop.ColumnFamilySplit; @@ -601,8 +590,15 @@ public class CqlRecordReader extends RecordReader<Long, Row> private void fetchKeys() { - String query = "SELECT column_name, component_index, type FROM system.schema_columns WHERE keyspace_name='" + - keyspace + "' and columnfamily_name='" + cfName + "'"; + String query = String.format("SELECT column_name, component_index, type " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNS, + keyspace, + cfName); + + // get CF meta data List<Row> rows = session.execute(query).all(); if (rows.isEmpty()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java index 311359a..0956ba5 100644 --- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java @@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.LongType; @@ -297,10 +300,6 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB { result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE); } - catch (InvalidRequestException e) - { - throw new RuntimeException("failed to prepare cql query " + cql, e); - } catch (TException e) { throw new RuntimeException("failed to prepare cql query " + cql, e); @@ -331,18 +330,20 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB return partitionKey; } + // FIXME /** retrieve the key validator from system.schema_columnfamilies table */ private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception { String keyspace = ConfigHelper.getOutputKeyspace(conf); String cfName = ConfigHelper.getOutputColumnFamily(conf); - String query = "SELECT key_validator," + - " key_aliases," + - " column_aliases " + - "FROM system.schema_columnfamilies " + - "WHERE keyspace_name='%s' and columnfamily_name='%s'"; - String formatted = String.format(query, keyspace, cfName); - CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(formatted), Compression.NONE, ConsistencyLevel.ONE); + String query = String.format("SELECT key_validator, key_aliases, column_aliases " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' and columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNFAMILIES, + keyspace, + cfName); + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); Column rawKeyValidator = result.rows.get(0).columns.get(0); String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java index dc37252..04d207f 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java @@ -26,6 +26,8 @@ import java.nio.charset.CharacterCodingException; import java.util.*; import org.apache.cassandra.db.Cell; +import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.auth.IAuthenticator; @@ -585,20 +587,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store IOException { // get CF meta data - String query = "SELECT type," + - " comparator," + - " subcomparator," + - " default_validator," + - " key_validator," + - " key_aliases " + - "FROM system.schema_columnfamilies " + - "WHERE keyspace_name = '%s' " + - " AND columnfamily_name = '%s' "; - - CqlResult result = client.execute_cql3_query( - ByteBufferUtil.bytes(String.format(query, keyspace, column_family)), - Compression.NONE, - ConsistencyLevel.ONE); + String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator, key_aliases " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNFAMILIES, + keyspace, + column_family); + + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); if (result == null || result.rows == null || result.rows.isEmpty()) return null; @@ -657,18 +654,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store ConfigurationException, NotFoundException { - String query = "SELECT column_name, " + - " validator, " + - " index_type, " + - " type " + - "FROM system.schema_columns " + - "WHERE keyspace_name = '%s' " + - " AND columnfamily_name = '%s'"; - - CqlResult result = client.execute_cql3_query( - ByteBufferUtil.bytes(String.format(query, keyspace, column_family)), - Compression.NONE, - ConsistencyLevel.ONE); + String query = String.format("SELECT column_name, validator, index_type, type " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNS, + keyspace, + column_family); + + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); List<CqlRow> rows = result.rows; List<ColumnDef> columnDefs = new ArrayList<ColumnDef>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java index 6cd5c66..fca1d43 100644 --- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java +++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java @@ -25,6 +25,8 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.BufferCell; +import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.composites.CellNames; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.marshal.*; @@ -482,21 +484,15 @@ public class CqlStorage extends AbstractCassandraStorage protected List<ColumnDef> getKeysMeta(Cassandra.Client client) throws Exception { - String query = "SELECT key_aliases, " + - " column_aliases, " + - " key_validator, " + - " comparator, " + - " keyspace_name, " + - " value_alias, " + - " default_validator " + - "FROM system.schema_columnfamilies " + - "WHERE keyspace_name = '%s'" + - " AND columnfamily_name = '%s' "; - - CqlResult result = client.execute_cql3_query( - ByteBufferUtil.bytes(String.format(query, keyspace, column_family)), - Compression.NONE, - ConsistencyLevel.ONE); + String query = String.format("SELECT key_aliases, column_aliases, key_validator, comparator, keyspace_name, value_alias, default_validator " + + "FROM %s.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNFAMILIES, + keyspace, + column_family); + + CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE); if (result == null || result.rows == null || result.rows.isEmpty()) return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e9d345f/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 43cd2c0..ec590f3 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -349,16 +349,16 @@ public class CQLSSTableWriter implements Closeable if (ksm == null) { ksm = KSMetaData.newKeyspace(this.schema.ksName, - AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), - ImmutableMap.of("replication_factor", "1"), - true, - Collections.singleton(this.schema)); + AbstractReplicationStrategy.getClass("org.apache.cassandra.locator.SimpleStrategy"), + ImmutableMap.of("replication_factor", "1"), + true, + Collections.singleton(this.schema)); Schema.instance.load(ksm); } else if (Schema.instance.getCFMetaData(this.schema.ksName, this.schema.cfName) == null) { Schema.instance.load(this.schema); - ksm = KSMetaData.cloneWith(ksm, Iterables.concat(ksm.cfMetaData().values(), Collections.singleton(this.schema))); + ksm = ksm.cloneWithTableAdded(this.schema); Schema.instance.setKeyspaceDefinition(ksm); Keyspace.open(ksm.name).initCf(this.schema.cfId, this.schema.cfName, false); }
