Updated Branches:
  refs/heads/trunk 86a7a3d1a -> 8142a2fe9

Add snapshot command to trigger snapshot on remote node

patch by vijay; reviewed by slebresne for CASSANDRA-3721 (first part)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8142a2fe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8142a2fe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8142a2fe

Branch: refs/heads/trunk
Commit: 8142a2fe93d1ac517e2056b61d4c53c6a4da57ea
Parents: 86a7a3d
Author: Sylvain Lebresne <[email protected]>
Authored: Wed Jan 25 11:58:47 2012 +0100
Committer: Sylvain Lebresne <[email protected]>
Committed: Wed Jan 25 12:02:49 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/db/SnapshotCommand.java   |   81 +++++++++++++++
 .../cassandra/service/SnapshotVerbHandler.java     |   52 +++++++++
 .../apache/cassandra/service/StorageService.java   |    4 +
 4 files changed, 138 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 07b6213..c0ea71c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -52,6 +52,7 @@
  * Fix BulkLoader to support new SSTable layout and add stream
    throttling to prevent an NPE when there is no yaml config (CASSANDRA-3752)
  * Allow concurrent schema migrations (CASSANDRA-1391)
+ * Add SnapshotCommand to trigger snapshot on remote node (CASSANDRA-3721)
 
 
 1.0.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/src/java/org/apache/cassandra/db/SnapshotCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SnapshotCommand.java 
b/src/java/org/apache/cassandra/db/SnapshotCommand.java
new file mode 100644
index 0000000..2b49874
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SnapshotCommand.java
@@ -0,0 +1,81 @@
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class SnapshotCommand implements MessageProducer
+{
+    private static final SnapshotCommandSerializer serializer = new 
SnapshotCommandSerializer();
+
+    public final String keyspace;
+    public final String column_family;
+    public final String snapshot_name;
+    public final boolean clear_snapshot;
+
+    public SnapshotCommand(String keyspace, String columnFamily, String 
snapshotName, boolean clearSnapshot)
+    {
+        this.keyspace = keyspace;
+        this.column_family = columnFamily;
+        this.snapshot_name = snapshotName;
+        this.clear_snapshot = clearSnapshot;
+    }
+
+    public Message getMessage(Integer version) throws IOException
+    {
+        DataOutputBuffer dob = new DataOutputBuffer();
+        serializer.serialize(this, dob, version);
+        return new Message(FBUtilities.getBroadcastAddress(), 
StorageService.Verb.SNAPSHOT, Arrays.copyOf(dob.getData(), dob.getLength()), 
version);
+    }
+
+    public static SnapshotCommand read(Message message) throws IOException
+    {
+        byte[] bytes = message.getMessageBody();
+        FastByteArrayInputStream bis = new FastByteArrayInputStream(bytes);
+        return serializer.deserialize(new DataInputStream(bis), 
message.getVersion());
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SnapshotCommand{" + "keyspace='" + keyspace + '\'' +
+                                  ", column_family='" + column_family + '\'' +
+                                  ", snapshot_name=" + snapshot_name +
+                                  ", clear_snapshot=" + clear_snapshot + '}';
+    }
+}
+
+class SnapshotCommandSerializer implements 
IVersionedSerializer<SnapshotCommand>
+{
+    public void serialize(SnapshotCommand snapshot_command, DataOutput dos, 
int version) throws IOException
+    {
+        dos.writeUTF(snapshot_command.keyspace);
+        dos.writeUTF(snapshot_command.column_family);
+        dos.writeUTF(snapshot_command.snapshot_name);
+        dos.writeBoolean(snapshot_command.clear_snapshot);
+    }
+
+    public SnapshotCommand deserialize(DataInput dis, int version) throws 
IOException
+    {
+        String keyspace = dis.readUTF();
+        String column_family = dis.readUTF();
+        String snapshot_name = dis.readUTF();
+        boolean clear_snapshot = dis.readBoolean();
+        return new SnapshotCommand(keyspace, column_family, snapshot_name, 
clear_snapshot);
+    }
+
+    public long serializedSize(SnapshotCommand snapshot_command, int version)
+    {
+        throw new UnsupportedOperationException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java 
b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
new file mode 100644
index 0000000..1f22cba
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/SnapshotVerbHandler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SnapshotVerbHandler implements IVerbHandler
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SnapshotVerbHandler.class);
+    public void doVerb(Message message, String id)
+    {
+        try
+        {
+            SnapshotCommand command = SnapshotCommand.read(message);
+            if (command.clear_snapshot)
+                
Table.open(command.keyspace).clearSnapshot(command.snapshot_name);
+            else
+                
Table.open(command.keyspace).getColumnFamilyStore(command.column_family).snapshot(command.snapshot_name);
+            Message response = 
message.getReply(FBUtilities.getBroadcastAddress(), new byte[0], 
MessagingService.version_);
+            if (logger.isDebugEnabled())
+                logger.debug("Sending response to snapshot request {} to {} ", 
command.snapshot_name, message.getFrom());
+            MessagingService.instance().sendReply(response, id, 
message.getFrom());
+        }
+        catch (Exception ex)
+        {
+            throw new RuntimeException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8142a2fe/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index ca7cc5e..c6d03b1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -112,6 +112,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         COUNTER_MUTATION,
         STREAMING_REPAIR_REQUEST,
         STREAMING_REPAIR_RESPONSE,
+        SNAPSHOT, // Similar to nt snapshot
         // use as padding for backwards compatability where a previous version 
needs to validate a verb from the future.
         UNUSED_1,
         UNUSED_2,
@@ -147,6 +148,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
         put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
         put(Verb.COUNTER_MUTATION, Stage.MUTATION);
+        put(Verb.SNAPSHOT, Stage.MISC);
         put(Verb.UNUSED_1, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
         put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
@@ -289,6 +291,8 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
         MessagingService.instance().registerVerbHandlers(Verb.SCHEMA_CHECK, 
new SchemaCheckVerbHandler());
         
MessagingService.instance().registerVerbHandlers(Verb.MIGRATION_REQUEST, new 
MigrationRequestVerbHandler());
 
+        MessagingService.instance().registerVerbHandlers(Verb.SNAPSHOT, new 
SnapshotVerbHandler());
+
         // spin up the streaming service so it is available for jmx tools.
         if (StreamingService.instance == null)
             throw new RuntimeException("Streaming service is unavailable.");

Reply via email to