Updated Branches: refs/heads/trunk c2a8f1288 -> 11f7d7253
Adds binary protocol events for schema changes patch by slebresne; reviewed by jbellis for CASSANDRA-4684 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/11f7d725 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/11f7d725 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/11f7d725 Branch: refs/heads/trunk Commit: 11f7d72536c3acf2ecdb255ddbcee545f29d6742 Parents: c2a8f12 Author: Sylvain Lebresne <[email protected]> Authored: Fri Oct 5 19:46:10 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Fri Oct 5 19:46:10 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol.spec | 28 +++++- .../cql3/statements/AlterKeyspaceStatement.java | 12 +++ .../cql3/statements/AlterTableStatement.java | 5 + .../statements/CreateColumnFamilyStatement.java | 6 + .../cql3/statements/CreateIndexStatement.java | 7 ++ .../cql3/statements/CreateKeyspaceStatement.java | 12 +++ .../cql3/statements/DropColumnFamilyStatement.java | 6 + .../cql3/statements/DropIndexStatement.java | 7 ++ .../cql3/statements/DropKeyspaceStatement.java | 12 +++ .../cql3/statements/SchemaAlteringStatement.java | 5 +- src/java/org/apache/cassandra/db/DefsTable.java | 16 +++ .../cassandra/service/IMigrationListener.java | 30 ++++++ .../apache/cassandra/service/MigrationManager.java | 53 ++++++++++ .../apache/cassandra/service/StorageService.java | 2 +- src/java/org/apache/cassandra/transport/Event.java | 48 +++++++++- .../org/apache/cassandra/transport/Server.java | 38 +++++++- .../transport/messages/ResultMessage.java | 74 ++++++++++++++- 18 files changed, 355 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7cfc311..cab1425 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -113,6 +113,7 @@ * Pluggable Thrift transport factories for CLI (CASSANDRA-4609) * Backport adding AlterKeyspace statement (CASSANDRA-4611) * (CQL3) Correcty accept upper-case data types (CASSANDRA-4770) + * Add binary protocol events for schema changes (CASSANDRA-4684) Merged from 1.0: * Switch from NBHM to CHM in MessagingService's callback map, which prevents OOM in long-running instances (CASSANDRA-4708) http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/doc/native_protocol.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec index 2e03a02..71a7c71 100644 --- a/doc/native_protocol.spec +++ b/doc/native_protocol.spec @@ -31,6 +31,7 @@ Table of Contents 4.2.5.2. Rows 4.2.5.3. Set_keyspace 4.2.5.4. Prepared + 4.2.5.5. Schema_change 4.2.6. EVENT 5. Compression 6. Collection types @@ -328,7 +329,8 @@ Table of Contents 0x0001 Void: for results carrying no information. 0x0002 Rows: for results to select queries, returning a set of rows. 0x0003 Set_keyspace: the result to a `use` query. - 0x0004 Prepared: result to a PREPARE message + 0x0004 Prepared: result to a PREPARE message. + 0x0005 Schema_change: the result to a schema altering query. The body for each kind (after the [int] kind) is defined below. @@ -416,6 +418,24 @@ Table of Contents - <id> is [short bytes] representing the prepared query ID. - <metadata> is defined exactly as for a Rows RESULT (See section 4.2.5.2). +4.2.5.5. Schema_change + + The result to a schema altering query (creation/update/drop of a + keyspace/table/index). The body (after the kind [int]) is composed of 3 + [string]: + <change><keyspace><table> + where: + - <change> describe the type of change that has occured. It can be one of + "CREATED", "UPDATED" or "DROPPED". + - <keyspace> is the name of the affected keyspace or the keyspace of the + affected table. + - <table> is the name of the affected table. <table> will be empty (i.e. + the empty string "") if the change was affecting a keyspace and not a + table. + + Note that queries to create and drop an index are considered as change + updating the table the index is on. + 4.2.6. EVENT @@ -434,6 +454,12 @@ Table of Contents consists of a [string] and an [inet], corresponding respectively to the type of status change ("UP" or "DOWN") followed by the address of the concerned node. + - "SCHEMA_CHANGE": events related to schema change. The body of the message + (after the event type) consists of 3 [string] corresponding respectively + to the type of schema change ("CREATED", "UPDATED" or "DROPPED"), + followed by the name of the affected keyspace and the name of the + affected table within that keyspace. For changes that affect a keyspace + directly, the table name will be empty (i.e. the empty string ""). All EVENT message have a streamId of -1 (Section 2.3). http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java index 13c9e44..52c422a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java @@ -28,6 +28,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.transport.messages.ResultMessage; public class AlterKeyspaceStatement extends SchemaAlteringStatement { @@ -41,6 +42,12 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement this.attrs = attrs; } + @Override + public String keyspace() + { + return name; + } + public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException { state.hasKeyspaceAccess(name, Permission.ALTER); @@ -83,4 +90,9 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement MigrationManager.announceKeyspaceUpdate(attrs.asKSMetadataUpdate(ksm)); } + + public ResultMessage.SchemaChange.Change changeType() + { + return ResultMessage.SchemaChange.Change.UPDATED; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java index a33ff12..40eb8f8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.transport.messages.ResultMessage; import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily; @@ -195,4 +196,8 @@ public class AlterTableStatement extends SchemaAlteringStatement validator); } + public ResultMessage.SchemaChange.Change changeType() + { + return ResultMessage.SchemaChange.Change.UPDATED; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java index 1cf3137..1775398 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateColumnFamilyStatement.java @@ -48,6 +48,7 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.transport.messages.ResultMessage; /** A <code>CREATE COLUMNFAMILY</code> parsed from a CQL query statement. */ public class CreateColumnFamilyStatement extends SchemaAlteringStatement @@ -100,6 +101,11 @@ public class CreateColumnFamilyStatement extends SchemaAlteringStatement MigrationManager.announceNewColumnFamily(getCFMetaData()); } + public ResultMessage.SchemaChange.Change changeType() + { + return ResultMessage.SchemaChange.Change.CREATED; + } + /** * Returns a CFMetaData instance based on the parameters parsed from this * <code>CREATE</code> statement, or defaults where applicable. http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java index fedbf27..710de11 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java @@ -37,6 +37,7 @@ import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.thrift.IndexType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.transport.messages.ResultMessage; /** A <code>CREATE INDEX</code> statement parsed from a CQL query. */ public class CreateIndexStatement extends SchemaAlteringStatement @@ -111,4 +112,10 @@ public class CreateIndexStatement extends SchemaAlteringStatement cfm.addDefaultIndexNames(); MigrationManager.announceColumnFamilyUpdate(cfm); } + + public ResultMessage.SchemaChange.Change changeType() + { + // Creating an index is akin to updating the CF + return ResultMessage.SchemaChange.Change.UPDATED; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java index 5933292..378a8c7 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java @@ -34,6 +34,7 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.transport.messages.ResultMessage; /** A <code>CREATE KEYSPACE</code> statement parsed from a CQL query. */ public class CreateKeyspaceStatement extends SchemaAlteringStatement @@ -55,6 +56,12 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement this.attrs = attrs; } + @Override + public String keyspace() + { + return name; + } + public void checkAccess(ClientState state) throws UnauthorizedException, InvalidRequestException { state.hasKeyspaceAccess(name, Permission.CREATE); @@ -96,4 +103,9 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement { MigrationManager.announceNewKeyspace(attrs.asKSMetadata(name)); } + + public ResultMessage.SchemaChange.Change changeType() + { + return ResultMessage.SchemaChange.Change.CREATED; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java index cba5944..7321642 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropColumnFamilyStatement.java @@ -24,6 +24,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.UnauthorizedException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.transport.messages.ResultMessage; public class DropColumnFamilyStatement extends SchemaAlteringStatement { @@ -41,4 +42,9 @@ public class DropColumnFamilyStatement extends SchemaAlteringStatement { MigrationManager.announceColumnFamilyDrop(keyspace(), columnFamily()); } + + public ResultMessage.SchemaChange.Change changeType() + { + return ResultMessage.SchemaChange.Change.DROPPED; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java index 3ed391f..d7f966c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java @@ -27,6 +27,7 @@ import org.apache.cassandra.cql3.*; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; +import org.apache.cassandra.transport.messages.ResultMessage; public class DropIndexStatement extends SchemaAlteringStatement { @@ -79,4 +80,10 @@ public class DropIndexStatement extends SchemaAlteringStatement return null; } + + public ResultMessage.SchemaChange.Change changeType() + { + // Dropping an index is akin to updating the CF + return ResultMessage.SchemaChange.Change.UPDATED; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java index 79f23aa..710e750 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java @@ -25,6 +25,7 @@ import org.apache.cassandra.exceptions.UnauthorizedException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.transport.messages.ResultMessage; public class DropKeyspaceStatement extends SchemaAlteringStatement { @@ -42,6 +43,12 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement } @Override + public String keyspace() + { + return keyspace; + } + + @Override public void validate(ClientState state) throws RequestValidationException { super.validate(state); @@ -52,4 +59,9 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement { MigrationManager.announceKeyspaceDrop(keyspace); } + + public ResultMessage.SchemaChange.Change changeType() + { + return ResultMessage.SchemaChange.Change.DROPPED; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java index 36407b4..95fc473 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java @@ -69,6 +69,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL return new Prepared(this); } + public abstract ResultMessage.SchemaChange.Change changeType(); + public abstract void announceMigration() throws RequestValidationException; @Override @@ -80,6 +82,8 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL try { announceMigration(); + String tableName = cfName == null || columnFamily() == null ? "" : columnFamily(); + return new ResultMessage.SchemaChange(changeType(), keyspace(), tableName); } catch (ConfigurationException e) { @@ -87,7 +91,6 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL ex.initCause(e); throw ex; } - return null; } public ResultMessage executeInternal(ClientState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DefsTable.java b/src/java/org/apache/cassandra/db/DefsTable.java index e696ad5..a012168 100644 --- a/src/java/org/apache/cassandra/db/DefsTable.java +++ b/src/java/org/apache/cassandra/db/DefsTable.java @@ -44,6 +44,7 @@ import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.migration.avro.KsDef; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -486,7 +487,10 @@ public class DefsTable Schema.instance.load(ksm); if (!StorageService.instance.isClientMode()) + { Table.open(ksm.name); + MigrationManager.instance.notifyCreateKeyspace(ksm); + } } private static void addColumnFamily(CFMetaData cfm) @@ -504,7 +508,10 @@ public class DefsTable Schema.instance.setTableDefinition(ksm); if (!StorageService.instance.isClientMode()) + { Table.open(ksm.name).initCf(cfm.cfId, cfm.cfName, true); + MigrationManager.instance.notifyCreateColumnFamily(cfm); + } } private static void updateKeyspace(KSMetaData newState) @@ -518,7 +525,10 @@ public class DefsTable try { if (!StorageService.instance.isClientMode()) + { Table.open(newState.name).createReplicationStrategy(newKsm); + MigrationManager.instance.notifyUpdateKeyspace(newKsm); + } } catch (ConfigurationException e) { @@ -537,6 +547,7 @@ public class DefsTable { Table table = Table.open(cfm.ksName); table.getColumnFamilyStore(cfm.cfName).reload(); + MigrationManager.instance.notifyUpdateColumnFamily(cfm); } } @@ -565,6 +576,10 @@ public class DefsTable // remove the table from the static instances. Table.clear(ksm.name); Schema.instance.clearTableDefinition(ksm); + if (!StorageService.instance.isClientMode()) + { + MigrationManager.instance.notifyDropKeyspace(ksm); + } } private static void dropColumnFamily(String ksName, String cfName) throws IOException @@ -587,6 +602,7 @@ public class DefsTable if (DatabaseDescriptor.isAutoSnapshot()) cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily)); Table.open(ksm.name).dropCf(cfm.cfId); + MigrationManager.instance.notifyDropColumnFamily(cfm); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/service/IMigrationListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/IMigrationListener.java b/src/java/org/apache/cassandra/service/IMigrationListener.java new file mode 100644 index 0000000..1a6854b --- /dev/null +++ b/src/java/org/apache/cassandra/service/IMigrationListener.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +public interface IMigrationListener +{ + public void onCreateKeyspace(String ksName); + public void onCreateColumnFamly(String ksName, String cfName); + + public void onUpdateKeyspace(String ksName); + public void onUpdateColumnFamly(String ksName, String cfName); + + public void onDropKeyspace(String ksName); + public void onDropColumnFamly(String ksName, String cfName); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/service/MigrationManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index f7ba701..d1987c2 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -22,6 +22,7 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.ArrayList; @@ -55,6 +56,22 @@ public class MigrationManager implements IEndpointStateChangeSubscriber private static final ByteBuffer LAST_MIGRATION_KEY = ByteBufferUtil.bytes("Last Migration"); + public static final MigrationManager instance = new MigrationManager(); + + private final List<IMigrationListener> listeners = new CopyOnWriteArrayList<IMigrationListener>(); + + private MigrationManager() {} + + public void register(IMigrationListener listener) + { + listeners.add(listener); + } + + public void unregister(IMigrationListener listener) + { + listeners.remove(listener); + } + public void onJoin(InetAddress endpoint, EndpointState epState) {} @@ -107,6 +124,42 @@ public class MigrationManager implements IEndpointStateChangeSubscriber return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0; } + public void notifyCreateKeyspace(KSMetaData ksm) + { + for (IMigrationListener listener : listeners) + listener.onCreateKeyspace(ksm.name); + } + + public void notifyCreateColumnFamily(CFMetaData cfm) + { + for (IMigrationListener listener : listeners) + listener.onCreateColumnFamly(cfm.ksName, cfm.cfName); + } + + public void notifyUpdateKeyspace(KSMetaData ksm) + { + for (IMigrationListener listener : listeners) + listener.onUpdateKeyspace(ksm.name); + } + + public void notifyUpdateColumnFamily(CFMetaData cfm) + { + for (IMigrationListener listener : listeners) + listener.onUpdateColumnFamly(cfm.ksName, cfm.cfName); + } + + public void notifyDropKeyspace(KSMetaData ksm) + { + for (IMigrationListener listener : listeners) + listener.onDropKeyspace(ksm.name); + } + + public void notifyDropColumnFamily(CFMetaData cfm) + { + for (IMigrationListener listener : listeners) + listener.onDropColumnFamly(cfm.ksName, cfm.cfName); + } + public static void announceNewKeyspace(KSMetaData ksm) throws ConfigurationException { ksm.validate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 47c4c92..7d92fbe 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -166,7 +166,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private static enum Mode { NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED, RELOCATING } private Mode operationMode; - private final MigrationManager migrationManager = new MigrationManager(); + private final MigrationManager migrationManager = MigrationManager.instance; /* Used for tracking drain progress */ private volatile int totalCFs, remainingCFs; http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index 849caff..855049d 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -25,7 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffers; public abstract class Event { - public enum Type { TOPOLOGY_CHANGE, STATUS_CHANGE } + public enum Type { TOPOLOGY_CHANGE, STATUS_CHANGE, SCHEMA_CHANGE } public final Type type; @@ -42,6 +42,8 @@ public abstract class Event return TopologyChange.deserializeEvent(cb); case STATUS_CHANGE: return StatusChange.deserializeEvent(cb); + case SCHEMA_CHANGE: + return SchemaChange.deserializeEvent(cb); } throw new AssertionError(); } @@ -143,4 +145,48 @@ public abstract class Event return status + " " + node; } } + + public static class SchemaChange extends Event + { + public enum Change { CREATED, UPDATED, DROPPED } + + public final Change change; + public final String keyspace; + public final String table; + + public SchemaChange(Change change, String keyspace, String table) + { + super(Type.SCHEMA_CHANGE); + this.change = change; + this.keyspace = keyspace; + this.table = table; + } + + public SchemaChange(Change change, String keyspace) + { + this(change, keyspace, ""); + } + + // Assumes the type has already by been deserialized + private static SchemaChange deserializeEvent(ChannelBuffer cb) + { + Change change = Enum.valueOf(Change.class, CBUtil.readString(cb).toUpperCase()); + String keyspace = CBUtil.readString(cb); + String table = CBUtil.readString(cb); + return new SchemaChange(change, keyspace, table); + } + + protected ChannelBuffer serializeEvent() + { + return ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(change.toString()), + CBUtil.stringToCB(keyspace), + CBUtil.stringToCB(table)); + } + + @Override + public String toString() + { + return change + " " + keyspace + (table.isEmpty() ? "" : "." + table); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index ab91b19..e820554 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.gms.*; import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.IMigrationListener; +import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.EventMessage; @@ -60,7 +62,9 @@ public class Server implements CassandraDaemon.Server public Server(InetSocketAddress socket) { this.socket = socket; - Gossiper.instance.register(new EventNotifier(this)); + EventNotifier notifier = new EventNotifier(this); + Gossiper.instance.register(notifier); + MigrationManager.instance.register(notifier); } public Server(String hostname, int port) @@ -199,7 +203,7 @@ public class Server implements CassandraDaemon.Server } } - private static class EventNotifier implements IEndpointStateChangeSubscriber + private static class EventNotifier implements IEndpointStateChangeSubscriber, IMigrationListener { private final Server server; @@ -251,5 +255,35 @@ public class Server implements CassandraDaemon.Server { server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); } + + public void onCreateKeyspace(String ksName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName)); + } + + public void onCreateColumnFamly(String ksName, String cfName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName, cfName)); + } + + public void onUpdateKeyspace(String ksName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName)); + } + + public void onUpdateColumnFamly(String ksName, String cfName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName, cfName)); + } + + public void onDropKeyspace(String ksName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName)); + } + + public void onDropColumnFamly(String ksName, String cfName) + { + server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName, cfName)); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/11f7d725/src/java/org/apache/cassandra/transport/messages/ResultMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java index 95fd333..955abc6 100644 --- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java @@ -56,7 +56,8 @@ public abstract class ResultMessage extends Message.Response VOID (1, Void.subcodec), ROWS (2, Rows.subcodec), SET_KEYSPACE (3, SetKeyspace.subcodec), - PREPARED (4, Prepared.subcodec); + PREPARED (4, Prepared.subcodec), + SCHEMA_CHANGE(5, SchemaChange.subcodec); public final int id; public final Message.Codec<ResultMessage> subcodec; @@ -314,4 +315,75 @@ public abstract class ResultMessage extends Message.Response return "RESULT PREPARED " + statementId + " " + metadata; } } + + public static class SchemaChange extends ResultMessage + { + public enum Change { CREATED, UPDATED, DROPPED } + + public final Change change; + public final String keyspace; + public final String columnFamily; + + public SchemaChange(Change change, String keyspace) + { + this(change, keyspace, ""); + } + + public SchemaChange(Change change, String keyspace, String columnFamily) + { + super(Kind.SCHEMA_CHANGE); + this.change = change; + this.keyspace = keyspace; + this.columnFamily = columnFamily; + } + + public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() + { + public ResultMessage decode(ChannelBuffer body) + { + String cStr = CBUtil.readString(body); + Change change = null; + try + { + change = Enum.valueOf(Change.class, cStr.toUpperCase()); + } + catch (IllegalStateException e) + { + throw new ProtocolException("Unknown Schema change action: " + cStr); + } + + String keyspace = CBUtil.readString(body); + String columnFamily = CBUtil.readString(body); + return new SchemaChange(change, keyspace, columnFamily); + + } + + public ChannelBuffer encode(ResultMessage msg) + { + assert msg instanceof SchemaChange; + SchemaChange scm = (SchemaChange)msg; + + ChannelBuffer a = CBUtil.stringToCB(scm.change.toString()); + ChannelBuffer k = CBUtil.stringToCB(scm.keyspace); + ChannelBuffer c = CBUtil.stringToCB(scm.columnFamily); + return ChannelBuffers.wrappedBuffer(a, k, c); + } + }; + + protected ChannelBuffer encodeBody() + { + return subcodec.encode(this); + } + + public CqlResult toThriftResult() + { + return new CqlResult(CqlResultType.VOID); + } + + @Override + public String toString() + { + return "RESULT schema change " + change + " on " + keyspace + (columnFamily.isEmpty() ? "" : "." + columnFamily); + } + } }
