Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9b2c467f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9b2c467f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9b2c467f Branch: refs/heads/trunk Commit: 9b2c467f247ef4e3992a5c4d7f68201479cdc905 Parents: b2e4d10 a90c80d Author: Aleksey Yeschenko <[email protected]> Authored: Wed Jun 3 14:17:07 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Jun 3 14:17:07 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 32 ++++++++++++-------- 2 files changed, 20 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b2c467f/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 42846d7,04650c1..8124256 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -21,127 -6,6 +21,128 @@@ Merged from 2.1 * Improve estimated row count (CASSANDRA-9107) * Optimize range tombstone memory footprint (CASSANDRA-8603) * Use configured gcgs in anticompaction (CASSANDRA-9397) +Merged from 2.0: ++ * Add broadcast and rpc addresses to system.local (CASSANDRA-9436) + * Fix null static columns in pages after the first, paged reversed + queries (CASSANDRA-8502) + + +2.2.0-beta1 + * Introduce Transactional API for internal state changes (CASSANDRA-8984) + * Add a flag in cassandra.yaml to enable UDFs (CASSANDRA-9404) + * Better support of null for UDF (CASSANDRA-8374) + * Use ecj instead of javassist for UDFs (CASSANDRA-8241) + * faster async logback configuration for tests (CASSANDRA-9376) + * Add `smallint` and `tinyint` data types (CASSANDRA-8951) + * Avoid thrift schema creation when native driver is used in stress tool (CASSANDRA-9374) + * Populate TokenMetadata early during startup (CASSANDRA-9317) + * Make Functions.declared thread-safe + * Add client warnings to native protocol v4 (CASSANDRA-8930) + * Allow roles cache to be invalidated (CASSANDRA-8967) + * Upgrade Snappy (CASSANDRA-9063) + * Don't start Thrift rpc by default (CASSANDRA-9319) + * Only stream from unrepaired sstables with incremental repair (CASSANDRA-8267) + * Aggregate UDFs allow SFUNC return type to differ from STYPE if FFUNC specified (CASSANDRA-9321) + * Remove Thrift dependencies in bundled tools (CASSANDRA-8358) + * Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242) + * Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049) + * Distinguish between null and unset in protocol v4 (CASSANDRA-7304) + * Add user/role permissions for user-defined functions (CASSANDRA-7557) + * Allow cassandra config to be updated to restart daemon without unloading classes (CASSANDRA-9046) + * Don't initialize compaction writer before checking if iter is empty (CASSANDRA-9117) + * Don't execute any functions at prepare-time (CASSANDRA-9037) + * Share file handles between all instances of a SegmentedFile (CASSANDRA-8893) + * Make it possible to major compact LCS (CASSANDRA-7272) + * Make FunctionExecutionException extend RequestExecutionException + (CASSANDRA-9055) + * Add support for SELECT JSON, INSERT JSON syntax and new toJson(), fromJson() + functions (CASSANDRA-7970) + * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920) + * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670) + * New tool added to validate all sstables in a node (CASSANDRA-5791) + * Push notification when tracing completes for an operation (CASSANDRA-7807) + * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236) + * Compressed Commit Log (CASSANDRA-6809) + * Optimise IntervalTree (CASSANDRA-8988) + * Add a key-value payload for third party usage (CASSANDRA-8553, 9212) + * Bump metrics-reporter-config dependency for metrics 3.0 (CASSANDRA-8149) + * Partition intra-cluster message streams by size, not type (CASSANDRA-8789) + * Add WriteFailureException to native protocol, notify coordinator of + write failures (CASSANDRA-8592) + * Convert SequentialWriter to nio (CASSANDRA-8709) + * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850) + * Record client ip address in tracing sessions (CASSANDRA-8162) + * Indicate partition key columns in response metadata for prepared + statements (CASSANDRA-7660) + * Merge UUIDType and TimeUUIDType parse logic (CASSANDRA-8759) + * Avoid memory allocation when searching index summary (CASSANDRA-8793) + * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730) + * Make CRC32Ex into a separate maven dependency (CASSANDRA-8836) + * Use preloaded jemalloc w/ Unsafe (CASSANDRA-8714, 9197) + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268) + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657) + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438) + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707) + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560) + * Support direct buffer decompression for reads (CASSANDRA-8464) + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039) + * Group sstables for anticompaction correctly (CASSANDRA-8578) + * Add ReadFailureException to native protocol, respond + immediately when replicas encounter errors while handling + a read request (CASSANDRA-7886) + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308) + * Allow mixing token and partition key restrictions (CASSANDRA-7016) + * Support index key/value entries on map collections (CASSANDRA-8473) + * Modernize schema tables (CASSANDRA-8261) + * Support for user-defined aggregation functions (CASSANDRA-8053) + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419) + * Refactor SelectStatement, return IN results in natural order instead + of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981) + * Support UDTs, tuples, and collections in user-defined + functions (CASSANDRA-7563) + * Fix aggregate fn results on empty selection, result column name, + and cqlsh parsing (CASSANDRA-8229) + * Mark sstables as repaired after full repair (CASSANDRA-7586) + * Extend Descriptor to include a format value and refactor reader/writer + APIs (CASSANDRA-7443) + * Integrate JMH for microbenchmarks (CASSANDRA-8151) + * Keep sstable levels when bootstrapping (CASSANDRA-7460) + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838) + * Support for aggregation functions (CASSANDRA-4914) + * Remove cassandra-cli (CASSANDRA-7920) + * Accept dollar quoted strings in CQL (CASSANDRA-7769) + * Make assassinate a first class command (CASSANDRA-7935) + * Support IN clause on any partition key column (CASSANDRA-7855) + * Support IN clause on any clustering column (CASSANDRA-4762) + * Improve compaction logging (CASSANDRA-7818) + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917) + * Do anticompaction in groups (CASSANDRA-6851) + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929, + 7924, 7812, 8063, 7813, 7708) + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) + * Remove CQL2 (CASSANDRA-5918) + * Optimize fetching multiple cells by name (CASSANDRA-6933) + * Allow compilation in java 8 (CASSANDRA-7028) + * Make incremental repair default (CASSANDRA-7250) + * Enable code coverage thru JaCoCo (CASSANDRA-7226) + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) + * Shorten SSTable path (CASSANDRA-6962) + * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + * Fail on very large batch sizes (CASSANDRA-8011) + * Improve concurrency of repair (CASSANDRA-6455, 8208, 9145) + * Select optimal CRC32 implementation at runtime (CASSANDRA-8614) + * Evaluate MurmurHash of Token once per query (CASSANDRA-7096) + * Generalize progress reporting (CASSANDRA-8901) + * Resumable bootstrap streaming (CASSANDRA-8838, CASSANDRA-8942) + * Allow scrub for secondary index (CASSANDRA-5174) + * Save repair data to system table (CASSANDRA-5839) + * fix nodetool names that reference column families (CASSANDRA-8872) + + +2.1.6 * Warn on misuse of unlogged batches (CASSANDRA-9282) * Failure detector detects and ignores local pauses (CASSANDRA-9183) * Add utility class to support for rate limiting a given log statement (CASSANDRA-9029) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9b2c467f/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 9956728,882dbdf..64d11f6 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -67,213 -66,33 +67,215 @@@ public final class SystemKeyspac { private static final Logger logger = LoggerFactory.getLogger(SystemKeyspace.class); - // see CFMetaData for schema definitions - public static final String PEERS_CF = "peers"; - public static final String PEER_EVENTS_CF = "peer_events"; - public static final String LOCAL_CF = "local"; - public static final String INDEX_CF = "IndexInfo"; - public static final String HINTS_CF = "hints"; - public static final String RANGE_XFERS_CF = "range_xfers"; - public static final String BATCHLOG_CF = "batchlog"; - // see layout description in the DefsTables class header - public static final String SCHEMA_KEYSPACES_CF = "schema_keyspaces"; - public static final String SCHEMA_COLUMNFAMILIES_CF = "schema_columnfamilies"; - public static final String SCHEMA_COLUMNS_CF = "schema_columns"; - public static final String SCHEMA_TRIGGERS_CF = "schema_triggers"; - public static final String SCHEMA_USER_TYPES_CF = "schema_usertypes"; - public static final String COMPACTION_LOG = "compactions_in_progress"; - public static final String PAXOS_CF = "paxos"; - public static final String SSTABLE_ACTIVITY_CF = "sstable_activity"; - public static final String COMPACTION_HISTORY_CF = "compaction_history"; - public static final String SIZE_ESTIMATES_CF = "size_estimates"; - - private static final String LOCAL_KEY = "local"; - - public static final List<String> allSchemaCfs = Arrays.asList(SCHEMA_KEYSPACES_CF, - SCHEMA_COLUMNFAMILIES_CF, - SCHEMA_COLUMNS_CF, - SCHEMA_TRIGGERS_CF, - SCHEMA_USER_TYPES_CF); + // Used to indicate that there was a previous version written to the legacy (pre 1.2) + // system.Versions table, but that we cannot read it. Suffice to say, any upgrade should + // proceed through 1.2.x before upgrading to the current version. + public static final CassandraVersion UNREADABLE_VERSION = new CassandraVersion("0.0.0-unknown"); + + // Used to indicate that no previous version information was found. When encountered, we assume that + // Cassandra was not previously installed and we're in the process of starting a fresh node. + public static final CassandraVersion NULL_VERSION = new CassandraVersion("0.0.0-absent"); + + public static final String NAME = "system"; + + public static final String HINTS = "hints"; + public static final String BATCHLOG = "batchlog"; + public static final String PAXOS = "paxos"; + public static final String BUILT_INDEXES = "IndexInfo"; + public static final String LOCAL = "local"; + public static final String PEERS = "peers"; + public static final String PEER_EVENTS = "peer_events"; + public static final String RANGE_XFERS = "range_xfers"; + public static final String COMPACTIONS_IN_PROGRESS = "compactions_in_progress"; + public static final String COMPACTION_HISTORY = "compaction_history"; + public static final String SSTABLE_ACTIVITY = "sstable_activity"; + public static final String SIZE_ESTIMATES = "size_estimates"; + public static final String AVAILABLE_RANGES = "available_ranges"; + + public static final CFMetaData Hints = + compile(HINTS, + "hints awaiting delivery", + "CREATE TABLE %s (" + + "target_id uuid," + + "hint_id timeuuid," + + "message_version int," + + "mutation blob," + + "PRIMARY KEY ((target_id), hint_id, message_version)) " + + "WITH COMPACT STORAGE") + .compactionStrategyOptions(Collections.singletonMap("enabled", "false")) + .gcGraceSeconds(0); + + public static final CFMetaData Batchlog = + compile(BATCHLOG, + "batches awaiting replay", + "CREATE TABLE %s (" + + "id uuid," + + "data blob," + + "version int," + + "written_at timestamp," + + "PRIMARY KEY ((id)))") + .compactionStrategyOptions(Collections.singletonMap("min_threshold", "2")) + .gcGraceSeconds(0); + + private static final CFMetaData Paxos = + compile(PAXOS, + "in-progress paxos proposals", + "CREATE TABLE %s (" + + "row_key blob," + + "cf_id UUID," + + "in_progress_ballot timeuuid," + + "most_recent_commit blob," + + "most_recent_commit_at timeuuid," + + "proposal blob," + + "proposal_ballot timeuuid," + + "PRIMARY KEY ((row_key), cf_id))") + .compactionStrategyClass(LeveledCompactionStrategy.class); + + // TODO: make private + public static final CFMetaData BuiltIndexes = + compile(BUILT_INDEXES, + "built column indexes", + "CREATE TABLE \"%s\" (" + + "table_name text," + + "index_name text," + + "PRIMARY KEY ((table_name), index_name)) " + + "WITH COMPACT STORAGE"); + + private static final CFMetaData Local = + compile(LOCAL, + "information about the local node", + "CREATE TABLE %s (" + + "key text," + + "bootstrapped text," ++ + "broadcast_address inet," + + "cluster_name text," + + "cql_version text," + + "data_center text," + + "gossip_generation int," + + "host_id uuid," + + "native_protocol_version text," + + "partitioner text," + + "rack text," + + "release_version text," ++ + "rpc_address inet," + + "schema_version uuid," + + "thrift_version text," + + "tokens set<varchar>," + + "truncated_at map<uuid, blob>," + + "PRIMARY KEY ((key)))"); + + private static final CFMetaData Peers = + compile(PEERS, + "information about known peers in the cluster", + "CREATE TABLE %s (" + + "peer inet," + + "data_center text," + + "host_id uuid," + + "preferred_ip inet," + + "rack text," + + "release_version text," + + "rpc_address inet," + + "schema_version uuid," + + "tokens set<varchar>," + + "PRIMARY KEY ((peer)))"); + + private static final CFMetaData PeerEvents = + compile(PEER_EVENTS, + "events related to peers", + "CREATE TABLE %s (" + + "peer inet," + + "hints_dropped map<uuid, int>," + + "PRIMARY KEY ((peer)))"); + + private static final CFMetaData RangeXfers = + compile(RANGE_XFERS, + "ranges requested for transfer", + "CREATE TABLE %s (" + + "token_bytes blob," + + "requested_at timestamp," + + "PRIMARY KEY ((token_bytes)))"); + + private static final CFMetaData CompactionsInProgress = + compile(COMPACTIONS_IN_PROGRESS, + "unfinished compactions", + "CREATE TABLE %s (" + + "id uuid," + + "columnfamily_name text," + + "inputs set<int>," + + "keyspace_name text," + + "PRIMARY KEY ((id)))"); + + private static final CFMetaData CompactionHistory = + compile(COMPACTION_HISTORY, + "week-long compaction history", + "CREATE TABLE %s (" + + "id uuid," + + "bytes_in bigint," + + "bytes_out bigint," + + "columnfamily_name text," + + "compacted_at timestamp," + + "keyspace_name text," + + "rows_merged map<int, bigint>," + + "PRIMARY KEY ((id)))") + .defaultTimeToLive((int) TimeUnit.DAYS.toSeconds(7)); + + private static final CFMetaData SSTableActivity = + compile(SSTABLE_ACTIVITY, + "historic sstable read rates", + "CREATE TABLE %s (" + + "keyspace_name text," + + "columnfamily_name text," + + "generation int," + + "rate_120m double," + + "rate_15m double," + + "PRIMARY KEY ((keyspace_name, columnfamily_name, generation)))"); + + private static final CFMetaData SizeEstimates = + compile(SIZE_ESTIMATES, + "per-table primary range size estimates", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "range_start text," + + "range_end text," + + "mean_partition_size bigint," + + "partitions_count bigint," + + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") + .gcGraceSeconds(0); + + private static final CFMetaData AvailableRanges = + compile(AVAILABLE_RANGES, + "Available keyspace/ranges during bootstrap/replace that are ready to be served", + "CREATE TABLE %s (" + + "keyspace_name text PRIMARY KEY," + + "ranges set<blob>" + + ")"); + + private static CFMetaData compile(String name, String description, String schema) + { + return CFMetaData.compile(String.format(schema, name), NAME) + .comment(description); + } + + public static KSMetaData definition() + { + Iterable<CFMetaData> tables = + Iterables.concat(LegacySchemaTables.All, + Arrays.asList(BuiltIndexes, + Hints, + Batchlog, + Paxos, + Local, + Peers, + PeerEvents, + RangeXfers, + CompactionsInProgress, + CompactionHistory, + SSTableActivity, + SizeEstimates, + AvailableRanges)); + return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables); + } private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords; @@@ -291,36 -110,92 +293,40 @@@ public static void finishStartup() { -- setupVersion(); - - migrateIndexInterval(); - migrateCachingOption(); - // add entries to system schema columnfamilies for the hardcoded system definitions - KSMetaData ksmd = Schema.instance.getKSMetaData(Keyspace.SYSTEM_KS); - - // delete old, possibly obsolete entries in schema columnfamilies - for (String cfname : Arrays.asList(SystemKeyspace.SCHEMA_KEYSPACES_CF, - SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, - SystemKeyspace.SCHEMA_COLUMNS_CF, - SystemKeyspace.SCHEMA_TRIGGERS_CF, - SystemKeyspace.SCHEMA_USER_TYPES_CF)) - executeOnceInternal(String.format("DELETE FROM system.%s WHERE keyspace_name = ?", cfname), ksmd.name); - - // (+1 to timestamp to make sure we don't get shadowed by the tombstones we just added) - ksmd.toSchema(FBUtilities.timestampMicros() + 1).apply(); - } - - private static void setupVersion() - { - String req = "INSERT INTO system.%s (key, release_version, cql_version, thrift_version, native_protocol_version, data_center, rack, partitioner, rpc_address, broadcast_address) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; ++ persistLocalMetadata(); + LegacySchemaTables.saveSystemKeyspaceSchema(); + } + - private static void setupVersion() ++ private static void persistLocalMetadata() + { + String req = "INSERT INTO system.%s (" + - " key, " + - " cluster_name, " + - " release_version, " + - " cql_version, " + - " thrift_version, " + - " native_protocol_version, " + - " data_center, " + - " rack, " + - " partitioner" + - ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; ++ "key," + ++ "cluster_name," + ++ "release_version," + ++ "cql_version," + ++ "thrift_version," + ++ "native_protocol_version," + ++ "data_center," + ++ "rack," + ++ "partitioner," + ++ "rpc_address," + ++ "broadcast_address" + ++ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - executeOnceInternal(String.format(req, LOCAL_CF), - LOCAL_KEY, + executeOnceInternal(String.format(req, LOCAL), + LOCAL, + DatabaseDescriptor.getClusterName(), FBUtilities.getReleaseVersionString(), QueryProcessor.CQL_VERSION.toString(), cassandraConstants.VERSION, String.valueOf(Server.CURRENT_VERSION), snitch.getDatacenter(FBUtilities.getBroadcastAddress()), snitch.getRack(FBUtilities.getBroadcastAddress()), - DatabaseDescriptor.getPartitioner().getClass().getName()); + DatabaseDescriptor.getPartitioner().getClass().getName(), + DatabaseDescriptor.getRpcAddress(), + FBUtilities.getBroadcastAddress()); } - // TODO: In 3.0, remove this and the index_interval column from system.schema_columnfamilies - /** Migrates index_interval values to min_index_interval and sets index_interval to null */ - private static void migrateIndexInterval() - { - for (UntypedResultSet.Row row : executeOnceInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF))) - { - if (!row.has("index_interval")) - continue; - - logger.debug("Migrating index_interval to min_index_interval"); - - CFMetaData table = CFMetaData.fromSchema(row); - String query = String.format("SELECT writetime(type) FROM system.%s WHERE keyspace_name = ? AND columnfamily_name = ?", SCHEMA_COLUMNFAMILIES_CF); - long timestamp = executeOnceInternal(query, table.ksName, table.cfName).one().getLong("writetime(type)"); - try - { - table.toSchema(timestamp).apply(); - } - catch (ConfigurationException e) - { - // shouldn't happen - } - } - } - - private static void migrateCachingOption() - { - for (UntypedResultSet.Row row : executeOnceInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF))) - { - if (!row.has("caching")) - continue; - - if (!CachingOptions.isLegacy(row.getString("caching"))) - continue; - try - { - CachingOptions caching = CachingOptions.fromString(row.getString("caching")); - CFMetaData table = CFMetaData.fromSchema(row); - logger.info("Migrating caching option {} to {} for {}.{}", row.getString("caching"), caching.toString(), table.ksName, table.cfName); - String query = String.format("SELECT writetime(type) FROM system.%s WHERE keyspace_name = ? AND columnfamily_name = ?", SCHEMA_COLUMNFAMILIES_CF); - long timestamp = executeOnceInternal(query, table.ksName, table.cfName).one().getLong("writetime(type)"); - table.toSchema(timestamp).apply(); - } - catch (ConfigurationException e) - { - // shouldn't happen - } - } - } - /** * Write compaction log, except columfamilies under system keyspace. *
