merge from 1.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd33330b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd33330b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd33330b Branch: refs/heads/cassandra-1.2 Commit: bd33330b9ff745322e2e3030e4920ee4eef6a47d Parents: 8dd1d56 dd1633b Author: Pavel Yaskevich <pyaskev...@twitter.com> Authored: Fri Nov 16 15:05:34 2012 -0800 Committer: Pavel Yaskevich <pyaskev...@twitter.com> Committed: Fri Nov 16 15:05:34 2012 -0800 ---------------------------------------------------------------------- CHANGES.txt | 3 ++ .../cassandra/db/DefinitionsUpdateVerbHandler.java | 4 +- src/java/org/apache/cassandra/db/DefsTable.java | 26 +++++++++++--- src/java/org/apache/cassandra/db/RowMutation.java | 3 ++ .../org/apache/cassandra/net/MessagingService.java | 8 ++-- .../apache/cassandra/service/MigrationManager.java | 9 +++-- .../org/apache/cassandra/cli/CliHelp.yaml | 2 +- 7 files changed, 38 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd33330b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index c499919,2ed9666..b935425 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -18,64 -2,7 +18,67 @@@ Merged from 1.1 * reset getRangeSlice filter after finishing a row for get_paged_slice (CASSANDRA-4919) * expunge row cache post-truncate (CASSANDRA-4940) + * remove IAuthority2 (CASSANDRA-4875) ++ * Allow static CF definition with compact storage (CASSANDRA-4910) ++ * Fix endless loop/compaction of schema_* CFs due to broken timestamps (CASSANDRA-4880) + + +1.2-beta2 + * fp rate of 1.0 disables BF entirely; LCS defaults to 1.0 (CASSANDRA-4876) + * off-heap bloom filters for row keys (CASSANDRA_4865) + * add extension point for sstable components (CASSANDRA-4049) + * improve tracing output (CASSANDRA-4852, 4862) + * make TRACE verb droppable (CASSANDRA-4672) + * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755) + * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793) + * Make hint delivery asynchronous (CASSANDRA-4761) + * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610) + * cassandra-cli: allow Double value type to be inserted to a column (CASSANDRA-4661) + * Add ability to use custom TServerFactory implementations (CASSANDRA-4608) + * optimize batchlog flushing to skip successful batches (CASSANDRA-4667) + * include metadata for system keyspace itself in schema tables (CASSANDRA-4416) + * add check to PropertyFileSnitch to verify presence of location for + local node (CASSANDRA-4728) + * add PBSPredictor consistency modeler (CASSANDRA-4261) + * remove vestiges of Thrift unframed mode (CASSANDRA-4729) + * optimize single-row PK lookups (CASSANDRA-4710) + * adjust blockFor calculation to account for pending ranges due to node + movement (CASSANDRA-833) + * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649) + * (CQL3) Make prepared statement global instead of per connection + (CASSANDRA-4449) + * Fix scrubbing of CQL3 created tables (CASSANDRA-4685) + * (CQL3) Fix validation when using counter and regular columns in the same + table (CASSANDRA-4706) + * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648) + * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738) + * Add support for multiple column family outputs in CFOF (CASSANDRA-4208) + * Support repairing only the local DC nodes (CASSANDRA-4747) + * Use rpc_address for binary protocol and change default port (CASSANRA-4751) + * Fix use of collections in prepared statements (CASSANDRA-4739) + * Store more information into peers table (CASSANDRA-4351, 4814) + * Configurable bucket size for size tiered compaction (CASSANDRA-4704) + * Run leveled compaction in parallel (CASSANDRA-4310) + * Fix potential NPE during CFS reload (CASSANDRA-4786) + * Composite indexes may miss results (CASSANDRA-4796) + * Move consistency level to the protocol level (CASSANDRA-4734, 4824) + * Fix Subcolumn slice ends not respected (CASSANDRA-4826) + * Fix Assertion error in cql3 select (CASSANDRA-4783) + * Fix list prepend logic (CQL3) (CASSANDRA-4835) + * Add booleans as literals in CQL3 (CASSANDRA-4776) + * Allow renaming PK columns in CQL3 (CASSANDRA-4822) + * Fix binary protocol NEW_NODE event (CASSANDRA-4679) + * Fix potential infinite loop in tombstone compaction (CASSANDRA-4781) + * Remove system tables accounting from schema (CASSANDRA-4850) + * Force provided columns in clustering key order in 'CLUSTERING ORDER BY' (CASSANDRA-4881) + * Fix composite index bug (CASSANDRA-4884) + * Fix short read protection for CQL3 (CASSANDRA-4882) + * Add tracing support to the binary protocol (CASSANDRA-4699) + * Don't allow prepared marker inside collections (CASSANDRA-4890) + * Re-allow order by on non-selected columns (CASSANDRA-4645) + * Bug when composite index is created in a table having collections (CASSANDRA-4909) + * log index scan subject in CompositesSearcher (CASSANDRA-4904) +Merged from 1.1: * add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859) * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816) * fix indexing empty column values (CASSANDRA-4832) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd33330b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java index 5c85530,6da1517..fdce853 --- a/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java +++ b/src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java @@@ -47,12 -45,7 +47,12 @@@ public class DefinitionsUpdateVerbHandl { public void runMayThrow() throws Exception { - if (message.version < MessagingService.VERSION_11) - DefsTable.mergeRemoteSchema(message.getMessageBody(), message.getVersion()); ++ if (message.version < MessagingService.VERSION_117) + { - logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1, please upgrade first"); ++ logger.error("Can't accept schema migrations from Cassandra versions previous to 1.1.7, please upgrade first"); + return; + } + DefsTable.mergeSchema(message.payload); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd33330b/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/DefsTable.java index f1b2b4f,4d6b574..8569553 --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@@ -167,12 -169,12 +166,12 @@@ public class DefsTabl fixSchemaNanoTimestamp(SystemTable.SCHEMA_COLUMNS_CF); } - private static void fixSchemaNanoTimestamp(String columnFamily) throws IOException + private static void fixSchemaNanoTimestamp(String columnFamily) { - ColumnFamilyStore cfs = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(columnFamily); + ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(columnFamily); boolean needsCleanup = false; - long timestamp = FBUtilities.timestampMicros(); + Date now = new Date(); List<Row> rows = SystemTable.serializedSchema(columnFamily); @@@ -212,12 -227,14 +224,14 @@@ throw new AssertionError(e); } + long microTimestamp = now.getTime() * 1000; + for (Row row : rows) { - if (invalidSchemaRow(row)) + if (Schema.invalidSchemaRow(row)) continue; - RowMutation mutation = new RowMutation(Table.SYSTEM_TABLE, row.key.key); + RowMutation mutation = new RowMutation(Table.SYSTEM_KS, row.key.key); for (IColumn column : row.cf.columns) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd33330b/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd33330b/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/net/MessagingService.java index d5aae5c,7974e6c..cce3925 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@@ -69,147 -65,18 +69,147 @@@ public final class MessagingService imp public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; // 8 bits version, so don't waste versions - // We are no longer compatible with versions older than 1.0 - public static final int VERSION_10 = 3; - public static final int VERSION_11 = 4; - public static final int VERSION_12 = 5; - public static final int VERSION_07 = 1; - public static final int VERSION_080 = 2; + public static final int VERSION_10 = 3; + public static final int VERSION_11 = 4; + public static final int VERSION_117 = 5; ++ public static final int VERSION_12 = 6; + public static final int current_version = VERSION_12; - public static final int version_ = VERSION_117; + /** + * we preface every message with this number so the recipient can validate the sender is sane + */ + static final int PROTOCOL_MAGIC = 0xCA552DFA; - static SerializerType serializerType_ = SerializerType.BINARY; + /* All verb handler identifiers */ + public enum Verb + { + MUTATION, + @Deprecated BINARY, + READ_REPAIR, + READ, + REQUEST_RESPONSE, // client-initiated reads and writes + @Deprecated STREAM_INITIATE, + @Deprecated STREAM_INITIATE_DONE, + STREAM_REPLY, + STREAM_REQUEST, + RANGE_SLICE, + BOOTSTRAP_TOKEN, + TREE_REQUEST, + TREE_RESPONSE, + @Deprecated JOIN, + GOSSIP_DIGEST_SYN, + GOSSIP_DIGEST_ACK, + GOSSIP_DIGEST_ACK2, + @Deprecated DEFINITIONS_ANNOUNCE, + DEFINITIONS_UPDATE, + TRUNCATE, + SCHEMA_CHECK, + @Deprecated INDEX_SCAN, + REPLICATION_FINISHED, + INTERNAL_RESPONSE, // responses to internal calls + COUNTER_MUTATION, + STREAMING_REPAIR_REQUEST, + STREAMING_REPAIR_RESPONSE, + SNAPSHOT, // Similar to nt snapshot + MIGRATION_REQUEST, + GOSSIP_SHUTDOWN, + _TRACE, // dummy verb so we can use MS.droppedMessages + // use as padding for backwards compatability where a previous version needs to validate a verb from the future. + UNUSED_1, + UNUSED_2, + UNUSED_3, + ; + // remember to add new verbs at the end, since we serialize by ordinal + } + + public static final Verb[] VERBS = Verb.values(); + + public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class) + {{ + put(Verb.MUTATION, Stage.MUTATION); + put(Verb.BINARY, Stage.MUTATION); + put(Verb.READ_REPAIR, Stage.MUTATION); + put(Verb.TRUNCATE, Stage.MUTATION); + put(Verb.READ, Stage.READ); + put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE); + put(Verb.STREAM_REPLY, Stage.MISC); // actually handled by FileStreamTask and streamExecutors + put(Verb.STREAM_REQUEST, Stage.MISC); + put(Verb.RANGE_SLICE, Stage.READ); + put(Verb.BOOTSTRAP_TOKEN, Stage.MISC); + put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY); + put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY); + put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY); + put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY); + put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP); + put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP); + put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP); + put(Verb.GOSSIP_SHUTDOWN, Stage.GOSSIP); + put(Verb.DEFINITIONS_UPDATE, Stage.MIGRATION); + put(Verb.SCHEMA_CHECK, Stage.MIGRATION); + put(Verb.MIGRATION_REQUEST, Stage.MIGRATION); + put(Verb.INDEX_SCAN, Stage.READ); + put(Verb.REPLICATION_FINISHED, Stage.MISC); + put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE); + put(Verb.COUNTER_MUTATION, Stage.MUTATION); + put(Verb.SNAPSHOT, Stage.MISC); + put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE); + put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE); + put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE); + }}; - /** we preface every message with this number so the recipient can validate the sender is sane */ - static final int PROTOCOL_MAGIC = 0xCA552DFA; + /** + * Messages we receive in IncomingTcpConnection have a Verb that tells us what kind of message it is. + * Most of the time, this is enough to determine how to deserialize the message payload. + * The exception is the REQUEST_RESPONSE verb, which just means "a reply to something you told me to do." + * Traditionally, this was fine since each VerbHandler knew what type of payload it expected, and + * handled the deserialization itself. Now that we do that in ITC, to avoid the extra copy to an + * intermediary byte[] (See CASSANDRA-3716), we need to wire that up to the CallbackInfo object + * (see below). + */ + public static final EnumMap<Verb, IVersionedSerializer<?>> verbSerializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class) + {{ + put(Verb.REQUEST_RESPONSE, CallbackDeterminedSerializer.instance); + put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance); + + put(Verb.MUTATION, RowMutation.serializer); + put(Verb.READ_REPAIR, RowMutation.serializer); + put(Verb.READ, ReadCommand.serializer); + put(Verb.STREAM_REPLY, StreamReply.serializer); + put(Verb.STREAM_REQUEST, StreamRequest.serializer); + put(Verb.RANGE_SLICE, RangeSliceCommand.serializer); + put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance); + put(Verb.TREE_REQUEST, AntiEntropyService.TreeRequest.serializer); + put(Verb.TREE_RESPONSE, AntiEntropyService.Validator.serializer); + put(Verb.STREAMING_REPAIR_REQUEST, StreamingRepairTask.serializer); + put(Verb.STREAMING_REPAIR_RESPONSE, UUIDSerializer.serializer); + put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer); + put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2.serializer); + put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSyn.serializer); + put(Verb.DEFINITIONS_UPDATE, MigrationManager.MigrationsSerializer.instance); + put(Verb.TRUNCATE, Truncation.serializer); + put(Verb.INDEX_SCAN, IndexScanCommand.serializer); + put(Verb.REPLICATION_FINISHED, null); + put(Verb.COUNTER_MUTATION, CounterMutation.serializer); + }}; + + /** + * A Map of what kind of serializer to wire up to a REQUEST_RESPONSE callback, based on outbound Verb. + */ + public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class) + {{ + put(Verb.MUTATION, WriteResponse.serializer); + put(Verb.READ_REPAIR, WriteResponse.serializer); + put(Verb.COUNTER_MUTATION, WriteResponse.serializer); + put(Verb.RANGE_SLICE, RangeSliceReply.serializer); + put(Verb.READ, ReadResponse.serializer); + put(Verb.TRUNCATE, TruncateResponse.serializer); + put(Verb.SNAPSHOT, null); + + put(Verb.MIGRATION_REQUEST, MigrationManager.MigrationsSerializer.instance); + put(Verb.SCHEMA_CHECK, UUIDSerializer.serializer); + put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance); + put(Verb.REPLICATION_FINISHED, null); + }}; /* This records all the results mapped by message Id */ private final ExpiringMap<String, CallbackInfo> callbacks; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd33330b/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/MigrationManager.java index d1987c2,973b190..76250a4 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@@ -102,8 -94,8 +102,8 @@@ public class MigrationManager implement private static void rectifySchema(UUID theirVersion, final InetAddress endpoint) { - // Can't request migrations from nodes with versions younger than 1.1 - if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_11) + // Can't request migrations from nodes with versions younger than 1.1.7 - if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_117) ++ if (MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_117) return; if (Schema.instance.getVersion().equals(theirVersion)) @@@ -317,11 -341,12 +317,12 @@@ liveEndpoints.remove(FBUtilities.getBroadcastAddress()); // force migration is there are nodes around, first of all - // check if there are nodes with versions >= 1.1 to request migrations from, + // check if there are nodes with versions >= 1.1.7 to request migrations from, // because migration format of the nodes with versions < 1.1 is incompatible with older versions + // and due to broken timestamps in versions prior to 1.1.7 for (InetAddress node : liveEndpoints) { - if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_11) - if (Gossiper.instance.getVersion(node) >= MessagingService.VERSION_117) ++ if (MessagingService.instance().getVersion(node) >= MessagingService.VERSION_117) { if (logger.isDebugEnabled()) logger.debug("Requesting schema from " + node); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd33330b/src/resources/org/apache/cassandra/cli/CliHelp.yaml ----------------------------------------------------------------------