Updated Branches: refs/heads/trunk 86a7a3d1a -> 8142a2fe9
Add snapshot command to trigger snapshot on remote node patch by vijay; reviewed by slebresne for CASSANDRA-3721 (first part) Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8142a2fe Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8142a2fe Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8142a2fe Branch: refs/heads/trunk Commit: 8142a2fe93d1ac517e2056b61d4c53c6a4da57ea Parents: 86a7a3d Author: Sylvain Lebresne <[email protected]> Authored: Wed Jan 25 11:58:47 2012 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Jan 25 12:02:49 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SnapshotCommand.java | 81 +++++++++++++++ .../cassandra/service/SnapshotVerbHandler.java | 52 +++++++++ .../apache/cassandra/service/StorageService.java | 4 + 4 files changed, 138 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 07b6213..c0ea71c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -52,6 +52,7 @@ * Fix BulkLoader to support new SSTable layout and add stream throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752) * Allow concurrent schema migrations (CASSANDRA-1391) + * Add SnapshotCommand to trigger snapshot on remote node (CASSANDRA-3721) 1.0.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/src/java/org/apache/cassandra/db/SnapshotCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java b/src/java/org/apache/cassandra/db/SnapshotCommand.java new file mode 100644 index 0000000..2b49874 --- /dev/null +++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java @@ -0,0 +1,81 @@ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessageProducer; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +public class SnapshotCommand implements MessageProducer +{ + private static final SnapshotCommandSerializer serializer = new SnapshotCommandSerializer(); + + public final String keyspace; + public final String column_family; + public final String snapshot_name; + public final boolean clear_snapshot; + + public SnapshotCommand(String keyspace, String columnFamily, String snapshotName, boolean clearSnapshot) + { + this.keyspace = keyspace; + this.column_family = columnFamily; + this.snapshot_name = snapshotName; + this.clear_snapshot = clearSnapshot; + } + + public Message getMessage(Integer version) throws IOException + { + DataOutputBuffer dob = new DataOutputBuffer(); + serializer.serialize(this, dob, version); + return new Message(FBUtilities.getBroadcastAddress(), StorageService.Verb.SNAPSHOT, Arrays.copyOf(dob.getData(), dob.getLength()), version); + } + + public static SnapshotCommand read(Message message) throws IOException + { + byte[] bytes = message.getMessageBody(); + FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes); + return serializer.deserialize(new DataInputStream(bis), message.getVersion()); + } + + @Override + public String toString() + { + return "SnapshotCommand{" + "keyspace='" + keyspace + '\'' + + ", column_family='" + column_family + '\'' + + ", snapshot_name=" + snapshot_name + + ", clear_snapshot=" + clear_snapshot + '}'; + } +} + +class SnapshotCommandSerializer implements IVersionedSerializer<SnapshotCommand> +{ + public void serialize(SnapshotCommand snapshot_command, DataOutput dos, int version) throws IOException + { + dos.writeUTF(snapshot_command.keyspace); + dos.writeUTF(snapshot_command.column_family); + dos.writeUTF(snapshot_command.snapshot_name); + dos.writeBoolean(snapshot_command.clear_snapshot); + } + + public SnapshotCommand deserialize(DataInput dis, int version) throws IOException + { + String keyspace = dis.readUTF(); + String column_family = dis.readUTF(); + String snapshot_name = dis.readUTF(); + boolean clear_snapshot = dis.readBoolean(); + return new SnapshotCommand(keyspace, column_family, snapshot_name, clear_snapshot); + } + + public long serializedSize(SnapshotCommand snapshot_command, int version) + { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java new file mode 100644 index 0000000..1f22cba --- /dev/null +++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.apache.cassandra.db.SnapshotCommand; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnapshotVerbHandler implements IVerbHandler +{ + private static final Logger logger = LoggerFactory.getLogger(SnapshotVerbHandler.class); + public void doVerb(Message message, String id) + { + try + { + SnapshotCommand command = SnapshotCommand.read(message); + if (command.clear_snapshot) + Table.open(command.keyspace).clearSnapshot(command.snapshot_name); + else + Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name); + Message response = message.getReply(FBUtilities.getBroadcastAddress(), new byte[0], MessagingService.version_); + if (logger.isDebugEnabled()) + logger.debug("Sending response to snapshot request {} to {} ", command.snapshot_name, message.getFrom()); + MessagingService.instance().sendReply(response, id, message.getFrom()); + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ca7cc5e..c6d03b1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -112,6 +112,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe COUNTER_MUTATION, STREAMING_REPAIR_REQUEST, STREAMING_REPAIR_RESPONSE, + SNAPSHOT, // Similar to nt snapshot // use as padding for backwards compatability where a previous version needs to validate a verb from the future. UNUSED_1, UNUSED_2, @@ -147,6 +148,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe put(Verb.REPLICATION_FINISHED, Stage.MISC); put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE); put(Verb.COUNTER_MUTATION, Stage.MUTATION); + put(Verb.SNAPSHOT, Stage.MISC); put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE); put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE); @@ -289,6 +291,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK, new SchemaCheckVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.MIGRATION_REQUEST, new MigrationRequestVerbHandler()); + MessagingService.instance().registerVerbHandlers(Verb.SNAPSHOT, new SnapshotVerbHandler()); + // spin up the streaming service so it is available for jmx tools. if (StreamingService.instance == null) throw new RuntimeException("Streaming service is unavailable.");
