Author: brandonwilliams
Date: Wed Jan  5 20:16:14 2011
New Revision: 1055618

URL: http://svn.apache.org/viewvc?rev=1055618&view=rev
Log:
Distributed test harness.  Patch by Kelvin Kakugawa, Stu Hood, and Ryan
King, reviewed by brandonwilliams for CASSANDRA-1859.

Added:
    cassandra/branches/cassandra-0.7/test/distributed/
    cassandra/branches/cassandra-0.7/test/distributed/README.txt
    cassandra/branches/cassandra-0.7/test/distributed/ivy.xml
      - copied, changed from r1055594, 
cassandra/branches/cassandra-0.7/ivysettings.xml
    cassandra/branches/cassandra-0.7/test/distributed/org/
    cassandra/branches/cassandra-0.7/test/distributed/org/apache/
    cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/
    
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/CassandraServiceController.java
    
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MovementTest.java
    
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java
    
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/TestBase.java
    
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/
    
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/BlobUtils.java
    
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/KeyPair.java
    cassandra/branches/cassandra-0.7/test/resources/whirr-default.properties
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/build.xml
    cassandra/branches/cassandra-0.7/ivysettings.xml

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1055618&r1=1055617&r2=1055618&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Wed Jan  5 20:16:14 2011
@@ -12,6 +12,7 @@ dev
  * implement describeOwnership for BOP, COPP (CASSANDRA-1928)
  * make read repair behave as expected for ConsistencyLevel > ONE
    (CASSANDRA-982)
+ * distributed test harness (CASSANDRA-1859)
 
 
 0.7.0-rc4

Modified: cassandra/branches/cassandra-0.7/build.xml
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/build.xml?rev=1055618&r1=1055617&r2=1055618&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/build.xml (original)
+++ cassandra/branches/cassandra-0.7/build.xml Wed Jan  5 20:16:14 2011
@@ -40,12 +40,14 @@
     <property name="interface.avro.dir" value="${interface.dir}/avro"/>
     <property name="test.dir" value="${basedir}/test"/>
     <property name="test.resources" value="${test.dir}/resources"/>
+    <property name="test.lib" value="${build.dir}/test/lib"/>
     <property name="test.classes" value="${build.dir}/test/classes"/>
     <property name="test.conf" value="${test.dir}/conf"/>
     <property name="test.data" value="${test.dir}/data"/>
     <property name="test.name" value="*Test"/>
     <property name="test.unit.src" value="${test.dir}/unit"/>
     <property name="test.long.src" value="${test.dir}/long"/>
+    <property name="test.distributed.src" value="${test.dir}/distributed"/>
     <property name="dist.dir" value="${build.dir}/dist"/>
     <property name="base.version" value="0.7.0-rc4"/>
     <condition property="version" value="${base.version}">
@@ -105,6 +107,7 @@
         <fail unless="is.source.artifact"
             message="Not a source artifact, stopping here." />
         <mkdir dir="${build.classes}"/>
+        <mkdir dir="${test.lib}"/>
         <mkdir dir="${test.classes}"/>
         <mkdir dir="${build.src.gen-java}"/>
     </target>
@@ -165,10 +168,17 @@
     </target>
 
     <target name="ivy-retrieve-build" depends="ivy-init">
+      <ivy:resolve file="${basedir}/ivy.xml"/>
       <ivy:retrieve type="jar,source" sync="true"
              pattern="${build.dir.lib}/[type]s/[artifact]-[revision].[ext]" />
     </target>
 
+    <target name="ivy-retrieve-test" depends="ivy-init">
+      <ivy:resolve file="${basedir}/test/distributed/ivy.xml"/>
+      <ivy:retrieve type="jar,source" sync="true"
+             pattern="${test.lib}/[type]s/[artifact]-[revision].[ext]" />
+    </target>
+
     <!--
        Generate avro code
     -->
@@ -453,28 +463,49 @@
     </copy>
   </target>
 
+  <target name="build-distributed-test" depends="build-test,ivy-retrieve-test" 
description="Compile distributed test classes (which have additional deps)">
+    <javac
+     debug="true"
+     debuglevel="${debuglevel}"
+     destdir="${test.classes}">
+      <classpath>
+          <path refid="cassandra.classpath"/>
+          <pathelement location="${test.classes}"/>
+          <fileset dir="${test.lib}">
+            <include name="**/*.jar" />
+          </fileset>
+      </classpath>
+      <src path="${test.distributed.src}"/>
+    </javac>
+  </target>
+
   <macrodef name="testmacro">
     <attribute name="suitename" />
     <attribute name="inputdir" />
     <attribute name="timeout" />
+    <attribute name="forkmode" default="perTest"/>
+    <element name="optjvmargs" implicit="true" optional="true" />
     <sequential>
       <echo message="running @{suitename} tests"/>
       <mkdir dir="${build.test.dir}/cassandra"/>
       <mkdir dir="${build.test.dir}/output"/>
-      <junit fork="on" failureproperty="testfailed" maxmemory="1024m" 
timeout="@{timeout}">
+      <junit fork="on" forkmode="@{forkmode}" failureproperty="testfailed" 
maxmemory="1024m" timeout="@{timeout}">
         <sysproperty key="net.sourceforge.cobertura.datafile" 
file="${cobertura.datafile}"/>
         <formatter type="xml" usefile="true"/>
         <formatter type="brief" usefile="false"/>
         <jvmarg value="-Dstorage-config=${test.conf}"/>
         <jvmarg value="-Daccess.properties=${test.conf}/access.properties"/>
         <jvmarg value="-Dlog4j.configuration=log4j-junit.properties" />
-        <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
         <jvmarg value="-ea"/>
+        <optjvmargs/>
         <classpath>
           <path refid="cassandra.classpath" />
           <pathelement location="${test.classes}"/>
           <pathelement location="${cobertura.dir}/cobertura.jar"/>
           <pathelement location="${test.conf}"/>
+          <fileset dir="${test.lib}">
+            <include name="**/*.jar" />
+          </fileset>
         </classpath>
         <batchtest todir="${build.test.dir}/output">
           <fileset dir="@{inputdir}" includes="**/${test.name}.java" />
@@ -485,13 +516,23 @@
   </macrodef>
 
   <target name="test" depends="build-test" description="Execute unit tests">
-    <testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000" />
+    <testmacro suitename="unit" inputdir="${test.unit.src}" timeout="60000">
+      <jvmarg value="-Dlegacy-sstable-root=${test.data}/legacy-sstables"/>
+    </testmacro>
   </target>
 
   <target name="long-test" depends="build-test" description="Execute 
functional tests">
     <testmacro suitename="long" inputdir="${test.long.src}" timeout="300000" />
   </target>
 
+  <!-- Depends on artifacts so that we can push a tarball to remote nodes, and 
has its own build target for cloudy deps. -->
+  <target name="distributed-test" depends="build-distributed-test,artifacts" 
description="Execute distributed tests: see ${test.distributed.src}/README.txt">
+    <testmacro suitename="distributed" inputdir="${test.distributed.src}" 
timeout="1200000" forkmode="once">
+      <jvmarg value="-Dwhirr.config=${whirr.config}"/>
+      <jvmarg 
value="-Dwhirr.cassandra_tarball=${build.dir}/${final.name}-bin.tar.gz"/>
+    </testmacro>
+  </target>
+
   <!-- instruments the classes to later create code coverage reports -->
   <target name="cobertura-instrument" depends="build,build-test">
     <taskdef resource="tasks.properties">

Modified: cassandra/branches/cassandra-0.7/ivysettings.xml
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/ivysettings.xml?rev=1055618&r1=1055617&r2=1055618&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/ivysettings.xml (original)
+++ cassandra/branches/cassandra-0.7/ivysettings.xml Wed Jan  5 20:16:14 2011
@@ -17,17 +17,18 @@
  ~ under the License.
  -->
 <ivysettings>
-  <settings defaultResolver="ibiblio"/>
+  <settings defaultResolver="chain"/>
   <resolvers>
     <chain name="chain" dual="true">
-      <ibiblio name="java.net2" root="http://download.java.net/maven/2/"; 
m2compatible="true"/>
-      <ibiblio name="cloudera" 
root="https://repository.cloudera.com/content/repositories/releases/"; 
m2compatible="true" />
-      <ibiblio name="ibiblio" m2compatible="true" />
+      <ibiblio name="ibiblio"                                                  
                         m2compatible="true"/>
+      <ibiblio name="java.net2" root="http://download.java.net/maven/2/";       
                         m2compatible="true"/>
+      <ibiblio name="apache"    
root="https://repository.apache.org/content/repositories/releases/";     
m2compatible="true"/>
+      <ibiblio name="cloudera"  
root="https://repository.cloudera.com/content/repositories/releases/";   
m2compatible="true"/>
+      <!-- for distributed tests -->
+      <ibiblio name="jclouds"   root="http://jclouds.googlecode.com/svn/repo";  
                         m2compatible="true"/>
+      <ibiblio name="oauth"     
root="http://oauth.googlecode.com/svn/code/maven";                       
m2compatible="true"/>
+      <ibiblio name="twttr"     root="http://maven.twttr.com/";                 
                         m2compatible="true"/>
     </chain>
   </resolvers>
-  <modules>
-    <module organisation="net.java.dev.jna" name="jna" resolver="chain" />
-    <module organisation="com.cloudera.hadoop" name="*" resolver="chain" />
-  </modules>
 </ivysettings>
 

Added: cassandra/branches/cassandra-0.7/test/distributed/README.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/README.txt?rev=1055618&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/distributed/README.txt (added)
+++ cassandra/branches/cassandra-0.7/test/distributed/README.txt Wed Jan  5 
20:16:14 2011
@@ -0,0 +1,56 @@
+Distributed Test Harness
+
+
+Sub-project description
+-----------------------
+
+A distributed test harness that deploys a cluster to a cloud provider,
+via Apache Whirr, runs tests against that cluster, then tears down
+the deployed cluster.
+
+Requirements
+------------
+  * A cloud provider account. [see: http://incubator.apache.org/whirr/]
+
+
+Getting started
+---------------
+
+First, setup an account w/ a supported cloud provider.  Then, refer to
+the Whirr documentation for configuration instructions.  Refer to:
+    * http://incubator.apache.org/whirr/quick-start-guide.html
+
+Setup your personal whirr configuration properties.  The shared whirr
+configuration is located at:
+    * test/resources/whirr-default.properties
+
+An example EC2/S3 whirr configuration would be:
+###############################################
+whirr.provider=ec2
+whirr.location-id=us-west-1
+whirr.image-id=us-west-1/ami-16f3a253
+whirr.hardware-id=m1.large
+whirr.identity=[EC2 Access Key ID]
+whirr.credential=[EC2 Secret Access Key]
+whirr.private-key-file=${sys:user.home}/.ssh/id_rsa
+whirr.public-key-file=${sys:user.home}/.ssh/id_rsa.pub
+whirr.run-url-base=http://hoodidge.net/scripts/
+whirr.blobstore.provider=s3
+whirr.blobstore.container=cassandratests
+###############################################
+
+The distributed tests are located in:
+    * test/distributed
+
+Run the tests via ant:
+    * ant distributed-test -Dwhirr.config=my-whirr.properties
+
+The ant target will:
+    * download extra dependencies via Apache Ivy
+    * compile the distributed tests
+    * push the local working copy to a blobstore to fetch from the test nodes
+    * deploy a cluster via Apache Whirr
+    * run the distributed tests against the cluster
+    * tear down the deployed cluster
+
+

Copied: cassandra/branches/cassandra-0.7/test/distributed/ivy.xml (from 
r1055594, cassandra/branches/cassandra-0.7/ivysettings.xml)
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/ivy.xml?p2=cassandra/branches/cassandra-0.7/test/distributed/ivy.xml&p1=cassandra/branches/cassandra-0.7/ivysettings.xml&r1=1055594&r2=1055618&rev=1055618&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/ivysettings.xml (original)
+++ cassandra/branches/cassandra-0.7/test/distributed/ivy.xml Wed Jan  5 
20:16:14 2011
@@ -7,7 +7,7 @@
  ~ "License"); you may not use this file except in compliance
  ~ with the License.  You may obtain a copy of the License at
  ~
- ~  http://www.apache.org/licenses/LICENSE-2.0
+ ~    http://www.apache.org/licenses/LICENSE-2.0
  ~
  ~ Unless required by applicable law or agreed to in writing,
  ~ software distributed under the License is distributed on an
@@ -16,18 +16,10 @@
  ~ specific language governing permissions and limitations
  ~ under the License.
  -->
-<ivysettings>
-  <settings defaultResolver="ibiblio"/>
-  <resolvers>
-    <chain name="chain" dual="true">
-      <ibiblio name="java.net2" root="http://download.java.net/maven/2/"; 
m2compatible="true"/>
-      <ibiblio name="cloudera" 
root="https://repository.cloudera.com/content/repositories/releases/"; 
m2compatible="true" />
-      <ibiblio name="ibiblio" m2compatible="true" />
-    </chain>
-  </resolvers>
-  <modules>
-    <module organisation="net.java.dev.jna" name="jna" resolver="chain" />
-    <module organisation="com.cloudera.hadoop" name="*" resolver="chain" />
-  </modules>
-</ivysettings>
-
+<ivy-module version="2.0">
+  <info organisation="apache-cassandra" module="cassandra-distributed-test"/>
+  <dependencies>
+    <dependency org="org.apache.whirr" name="whirr-core" 
rev="0.3.0-incubating-SNAPSHOT"/>
+    <dependency org="org.apache.whirr" name="whirr-cli" 
rev="0.3.0-incubating-SNAPSHOT"/>
+  </dependencies>
+</ivy-module>

Added: 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/CassandraServiceController.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/CassandraServiceController.java?rev=1055618&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/CassandraServiceController.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/CassandraServiceController.java
 Wed Jan  5 20:16:14 2011
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.*;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.utils.KeyPair;
+import org.apache.cassandra.utils.BlobUtils;
+import org.apache.cassandra.utils.Pair;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.*;
+
+import org.apache.whirr.service.Cluster;
+import org.apache.whirr.service.Cluster.Instance;
+import org.apache.whirr.service.ClusterSpec;
+import org.apache.whirr.service.ComputeServiceContextBuilder;
+import org.apache.whirr.service.Service;
+import org.apache.whirr.service.ServiceFactory;
+import org.apache.whirr.service.cassandra.CassandraService;
+import org.apache.whirr.service.cassandra.CassandraClusterActionHandler;
+import org.apache.whirr.service.jclouds.RunUrlStatement;
+
+import org.jclouds.blobstore.domain.BlobMetadata;
+
+import org.jclouds.compute.ComputeService;
+import org.jclouds.compute.domain.NodeMetadata;
+import org.jclouds.compute.options.RunScriptOptions;
+import org.jclouds.domain.Credentials;
+import org.jclouds.io.Payload;
+import org.jclouds.scriptbuilder.domain.OsFamily;
+import org.jclouds.ssh.ExecResponse;
+import static org.jclouds.io.Payloads.newStringPayload;
+
+import com.google.common.base.Predicate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertThat;
+
+public class CassandraServiceController
+{
+    private static final Logger LOG =
+        LoggerFactory.getLogger(CassandraServiceController.class);
+
+    protected static int CLIENT_PORT    = 9160;
+    protected static int JMX_PORT       = 8080;
+
+    private static final CassandraServiceController INSTANCE =
+        new CassandraServiceController();
+    
+    public static CassandraServiceController getInstance()
+    {
+        return INSTANCE;
+    }
+    
+    private boolean     running;
+
+    private ClusterSpec         clusterSpec;
+    private CassandraService    service;
+    private Cluster             cluster;
+    private ComputeService      computeService;
+    private Credentials         credentials;
+    private CompositeConfiguration config;
+    private BlobMetadata        tarball;
+    private List<InetAddress>   hosts;
+    
+    private CassandraServiceController()
+    {
+    }
+
+    public Cassandra.Client createClient(InetAddress addr)
+        throws TTransportException, TException
+    {
+        TTransport transport    = new TSocket(
+                                    addr.getHostAddress(),
+                                    CLIENT_PORT,
+                                    200000);
+        transport               = new TFramedTransport(transport);
+        TProtocol  protocol     = new TBinaryProtocol(transport);
+
+        Cassandra.Client client = new Cassandra.Client(protocol);
+        transport.open();
+
+        return client;
+    }
+
+    private void waitForClusterInitialization()
+    {
+        for (InetAddress host : hosts)
+            waitForNodeInitialization(host);
+    }
+    
+    private void waitForNodeInitialization(InetAddress addr)
+    {
+        while (true)
+        {
+            try
+            {
+                Cassandra.Client client = createClient(addr);
+
+                client.describe_cluster_name();
+                break;
+            }
+            catch (TException e)
+            {
+                try
+                {
+                    Thread.sleep(1000);
+                }
+                catch (InterruptedException ie)
+                {
+                    break;
+                }
+            }
+        }
+    }
+
+    public synchronized void startup() throws Exception
+    {
+        LOG.info("Starting up cluster...");
+
+        config = new CompositeConfiguration();
+        if (System.getProperty("whirr.config") != null)
+        {
+            config.addConfiguration(
+                new 
PropertiesConfiguration(System.getProperty("whirr.config")));
+        }
+        config.addConfiguration(new 
PropertiesConfiguration("whirr-default.properties"));
+
+        clusterSpec = new ClusterSpec(config);
+        if (clusterSpec.getPrivateKey() == null)
+        {
+            Map<String, String> pair = KeyPair.generate();
+            clusterSpec.setPublicKey(pair.get("public"));
+            clusterSpec.setPrivateKey(pair.get("private"));
+        }
+
+        // if a local tarball is available deploy it to the blobstore where it 
will be available to cassandra
+        if (System.getProperty("whirr.cassandra_tarball") != null)
+        {
+            Pair<BlobMetadata,URI> blob = BlobUtils.storeBlob(config, 
clusterSpec, System.getProperty("whirr.cassandra_tarball"));
+            tarball = blob.left;
+            config.setProperty(CassandraClusterActionHandler.BIN_TARBALL, 
blob.right.toURL().toString());
+            // TODO: parse the CassandraVersion property file instead
+            config.setProperty(CassandraClusterActionHandler.MAJOR_VERSION, 
"0.7");
+        }
+
+        service = (CassandraService)new 
ServiceFactory().create(clusterSpec.getServiceName());
+        cluster = service.launchCluster(clusterSpec);
+        computeService = 
ComputeServiceContextBuilder.build(clusterSpec).getComputeService();
+        hosts = new ArrayList<InetAddress>();
+        for (Instance instance : cluster.getInstances())
+        {
+            hosts.add(instance.getPublicAddress());
+            credentials = instance.getLoginCredentials();
+        }
+
+        waitForClusterInitialization();
+
+        ShutdownHook shutdownHook = new ShutdownHook(this);
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+        running = true;
+    }
+
+    public synchronized void shutdown()
+    {
+        // catch and log errors, we're in a runtime shutdown hook
+        try
+        {
+            LOG.info("Shutting down cluster...");
+            if (service != null)
+                service.destroyCluster(clusterSpec);
+            if (tarball != null)
+                BlobUtils.deleteBlob(config, clusterSpec, tarball);
+            running = false;
+        }
+        catch (Exception e)
+        {
+            LOG.error(String.format("Error shutting down cluster: %s", e));
+        }
+    }
+
+    public class ShutdownHook extends Thread
+    {
+        private CassandraServiceController controller;
+
+        public ShutdownHook(CassandraServiceController controller)
+        {
+            this.controller = controller;
+        }
+
+        public void run()
+        {
+            controller.shutdown();
+        }
+    }
+
+    public synchronized boolean ensureClusterRunning() throws Exception
+    {
+        if (running)
+        {
+            LOG.info("Cluster already running.");
+            return false;
+        }
+        else
+        {
+            startup();
+            return true;
+        }
+    }
+
+    /**
+     * Execute nodetool with args against localhost from the given host.
+     */
+    public void nodetool(String args, InetAddress... hosts)
+    {
+        callOnHosts(String.format("apache/cassandra/nodetool %s", args), 
hosts);
+    }
+
+    /**
+     * Wipes all persisted state for the given node, leaving it as if it had 
just started.
+     */
+    public void wipeHosts(InetAddress... hosts)
+    {
+        callOnHosts("apache/cassandra/wipe-state", hosts);
+    }
+
+    public Failure failHosts(List<InetAddress> hosts)
+    {
+        return new Failure(hosts.toArray(new 
InetAddress[hosts.size()])).trigger();
+    }
+
+    public Failure failHosts(InetAddress... hosts)
+    {
+        return new Failure(hosts).trigger();
+    }
+
+    /** TODO: Move to CassandraService? */
+    protected void callOnHosts(String payload, InetAddress... hosts)
+    {
+        final Set<String> hostset = new HashSet<String>();
+        for (InetAddress host : hosts)
+            hostset.add(host.getHostAddress());
+        Map<? extends NodeMetadata,ExecResponse> results;
+        try
+        {
+            results = computeService.runScriptOnNodesMatching(new 
Predicate<NodeMetadata>()
+            {
+                public boolean apply(NodeMetadata node)
+                {
+                    Set<String> intersection = new HashSet<String>(hostset);
+                    intersection.retainAll(node.getPublicAddresses());
+                    return !intersection.isEmpty();
+                }
+            }, newStringPayload(new 
RunUrlStatement(clusterSpec.getRunUrlBase(), payload).render(OsFamily.UNIX)),
+            RunScriptOptions.Builder.overrideCredentialsWith(credentials));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+        if (results.size() != hostset.size())
+            throw new RuntimeException(results.size() + " hosts matched " + 
hostset + ": " + results);
+        for (ExecResponse response : results.values())
+            if (response.getExitCode() != 0)
+                throw new RuntimeException("Call " + payload + " failed on at 
least one of " + hostset + ": " + results.values());
+    }
+
+    public List<InetAddress> getHosts()
+    {
+        return hosts;
+    }
+
+    class Failure
+    {
+        private InetAddress[] hosts;
+
+        public Failure(InetAddress... hosts)
+        {
+            this.hosts = hosts;
+        }
+        
+        public Failure trigger()
+        {
+            callOnHosts("apache/cassandra/stop", hosts);
+            return this;
+        }
+
+        public void resolve()
+        {
+            callOnHosts("apache/cassandra/start", hosts);
+            for (InetAddress host : hosts)
+                waitForNodeInitialization(host);
+        }
+    }
+
+    public InetAddress getPublicHost(InetAddress privateHost)
+    {
+        for (Instance instance : cluster.getInstances())
+            if (privateHost.equals(instance.getPrivateAddress()))
+                return instance.getPublicAddress();
+        throw new RuntimeException("No public host for private host " + 
privateHost);
+    }
+}

Added: 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MovementTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MovementTest.java?rev=1055618&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MovementTest.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MovementTest.java
 Wed Jan  5 20:16:14 2011
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+import org.apache.cassandra.CassandraServiceController.Failure;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+
+public class MovementTest extends TestBase
+{
+    private static final String STANDARD_CF = "Standard1";
+    private static final ColumnParent STANDARD = new ColumnParent(STANDARD_CF);
+
+    /** Inserts 1000 keys with names such that at least 1 key ends up on each 
host. */
+    private static Map<ByteBuffer,List<ColumnOrSuperColumn>> 
insertBatch(Cassandra.Client client) throws Exception
+    {
+        final int N = 1000;
+        Column col1 = new Column(
+            ByteBuffer.wrap("c1".getBytes()),
+            ByteBuffer.wrap("v1".getBytes()),
+            0
+            );
+        Column col2 = new Column(
+            ByteBuffer.wrap("c2".getBytes()),
+            ByteBuffer.wrap("v2".getBytes()),
+            0
+            );
+
+        // build N rows
+        Map<ByteBuffer,List<ColumnOrSuperColumn>> rows = new 
HashMap<ByteBuffer, List<ColumnOrSuperColumn>>();
+        Map<ByteBuffer,Map<String,List<Mutation>>> batch = new 
HashMap<ByteBuffer,Map<String,List<Mutation>>>();
+        for (int i = 0; i < N; i++)
+        {
+            String rawKey = String.format("test.key.%d", i);
+            ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes());
+            Mutation m1 = (new Mutation()).setColumn_or_supercolumn((new 
ColumnOrSuperColumn()).setColumn(col1));
+            Mutation m2 = (new Mutation()).setColumn_or_supercolumn((new 
ColumnOrSuperColumn()).setColumn(col2));
+            rows.put(key, Arrays.asList(m1.getColumn_or_supercolumn(),
+                                        m2.getColumn_or_supercolumn()));
+
+            // add row to batch
+            Map<String,List<Mutation>> rowmap = new 
HashMap<String,List<Mutation>>();
+            rowmap.put(STANDARD_CF, Arrays.asList(m1, m2));
+            batch.put(key, rowmap);
+        }
+        // insert the batch
+        client.batch_mutate(batch, ConsistencyLevel.ONE);
+        return rows;
+    }
+
+    private static void verifyBatch(Cassandra.Client client, 
Map<ByteBuffer,List<ColumnOrSuperColumn>> batch) throws Exception
+    {
+        for (Map.Entry<ByteBuffer,List<ColumnOrSuperColumn>> entry : 
batch.entrySet())
+        {
+            // verify slice
+            SlicePredicate sp = new SlicePredicate();
+            sp.setSlice_range(
+                new SliceRange(
+                    ByteBuffer.wrap(new byte[0]),
+                    ByteBuffer.wrap(new byte[0]),
+                    false,
+                    1000
+                    )
+                );
+            assertEquals(client.get_slice(entry.getKey(), STANDARD, sp, 
ConsistencyLevel.ONE),
+                         entry.getValue());
+        }
+    }
+
+    @Test
+    public void testLoadbalance() throws Exception
+    {
+        final String keyspace = "TestLoadbalance";
+        addKeyspace(keyspace, 1);
+        List<InetAddress> hosts = controller.getHosts();
+        Cassandra.Client client = controller.createClient(hosts.get(0));
+        client.set_keyspace(keyspace);
+
+        // add keys to each node
+        Map<ByteBuffer,List<ColumnOrSuperColumn>> rows = insertBatch(client);
+
+        Thread.sleep(100);
+
+        // ask a node to move to a new location
+        controller.nodetool("loadbalance", hosts.get(0));
+
+        // trigger cleanup on all nodes
+        for (InetAddress host : hosts)
+            controller.nodetool("cleanup", host);
+
+        // check that all keys still exist
+        verifyBatch(client, rows);
+    }
+}

Added: 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java?rev=1055618&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/MutationTest.java
 Wed Jan  5 20:16:14 2011
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.utils.WrappedRunnable;
+import  org.apache.thrift.TException;
+import org.apache.cassandra.client.*;
+import org.apache.cassandra.dht.RandomPartitioner;
+
+import org.apache.cassandra.CassandraServiceController.Failure;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+
+public class MutationTest extends TestBase
+{
+    @Test
+    public void testInsert() throws Exception
+    {
+        List<InetAddress> hosts = controller.getHosts();
+        final String keyspace = "TestInsert";
+        addKeyspace(keyspace, 3);
+        Cassandra.Client client = controller.createClient(hosts.get(0));
+        client.set_keyspace(keyspace);
+
+        ByteBuffer key = newKey();
+
+        insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
+        insert(client, key, "Standard1", "c2", "v2", 0, ConsistencyLevel.ONE);
+
+
+        assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", 
"c1", ConsistencyLevel.ONE));
+        assertColumnEqual("c2", "v2", 0, getColumn(client, key, "Standard1", 
"c2", ConsistencyLevel.ONE));
+
+        List<ColumnOrSuperColumn> coscs = get_slice(client, key, "Standard1", 
ConsistencyLevel.ONE);
+        assertColumnEqual("c1", "v1", 0, coscs.get(0).column);
+        assertColumnEqual("c2", "v2", 0, coscs.get(1).column);
+    }
+
+    @Test
+    public void testWriteAllReadOne() throws Exception
+    {
+        List<InetAddress> hosts = controller.getHosts();
+        Cassandra.Client client = controller.createClient(hosts.get(0));
+
+        final String keyspace = "TestWriteAllReadOne";
+        addKeyspace(keyspace, 3);
+        client.set_keyspace(keyspace);
+
+        ByteBuffer key = newKey();
+
+        insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ALL);
+        assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", 
"c1", ConsistencyLevel.ONE));
+
+        List<InetAddress> endpoints = endpointsForKey(hosts.get(0), key, 
keyspace);
+        InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, 
keyspace);
+        Failure failure = controller.failHosts(endpoints.subList(1, 
endpoints.size()));
+
+        Thread.sleep(10000); // let gossip catch up
+
+        try {
+            client = controller.createClient(coordinator);
+            client.set_keyspace(keyspace);
+
+            assertColumnEqual("c1", "v1", 0, getColumn(client, key, 
"Standard1", "c1", ConsistencyLevel.ONE));
+
+            insert(client, key, "Standard1", "c3", "v3", 0, 
ConsistencyLevel.ALL);
+            assert false;
+        } catch (UnavailableException e) {
+            // [this is good]
+        } finally {
+            failure.resolve();
+            Thread.sleep(10000);
+        }
+    }
+
+    @Test
+    public void testWriteQuorumReadQuorum() throws Exception
+    {
+        List<InetAddress> hosts = controller.getHosts();
+        Cassandra.Client client = controller.createClient(hosts.get(0));
+
+        final String keyspace = "TestWriteQuorumReadQuorum";
+        addKeyspace(keyspace, 3);
+        client.set_keyspace(keyspace);
+
+        ByteBuffer key = newKey();
+
+        // with quorum-1 nodes up
+        List<InetAddress> endpoints = endpointsForKey(hosts.get(0), key, 
keyspace);
+        InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, 
keyspace);
+        Failure failure = controller.failHosts(endpoints.subList(1, 
endpoints.size())); //kill all but one nodes
+
+        Thread.sleep(10000);
+        client = controller.createClient(coordinator);
+        client.set_keyspace(keyspace);
+        try {
+            insert(client, key, "Standard1", "c1", "v1", 0, 
ConsistencyLevel.QUORUM);
+            assert false;
+        } catch (UnavailableException e) {
+            // [this is good]
+        } finally {
+            failure.resolve();
+            Thread.sleep(10000);
+        }
+
+        // with all nodes up
+        insert(client, key, "Standard1", "c2", "v2", 0, 
ConsistencyLevel.QUORUM);
+
+        failure = controller.failHosts(endpoints.get(0));
+        Thread.sleep(10000);
+        try {
+            getColumn(client, key, "Standard1", "c2", ConsistencyLevel.QUORUM);
+        } finally {
+            failure.resolve();
+            Thread.sleep(10000);
+        }
+    }
+
+    @Test
+    public void testWriteOneReadAll() throws Exception
+    {
+        List<InetAddress> hosts = controller.getHosts();
+        Cassandra.Client client = controller.createClient(hosts.get(0));
+
+        final String keyspace = "TestWriteOneReadAll";
+        addKeyspace(keyspace, 3);
+        client.set_keyspace(keyspace);
+
+        ByteBuffer key = newKey();
+
+        List<InetAddress> endpoints = endpointsForKey(hosts.get(0), key, 
keyspace);
+        InetAddress coordinator = nonEndpointForKey(hosts.get(0), key, 
keyspace);
+        client = controller.createClient(coordinator);
+        client.set_keyspace(keyspace);
+
+        insert(client, key, "Standard1", "c1", "v1", 0, ConsistencyLevel.ONE);
+        assertColumnEqual("c1", "v1", 0, getColumn(client, key, "Standard1", 
"c1", ConsistencyLevel.ALL));
+
+        // with each of HH, read repair and proactive repair:
+            // with one node up
+            // write with one (success)
+            // read with all (failure)
+            // bring nodes up
+            // repair
+            // read with all (success)
+
+        Failure failure = controller.failHosts(endpoints);
+        Thread.sleep(10000);
+        try {
+            insert(client, key, "Standard1", "c2", "v2", 0, 
ConsistencyLevel.ONE);
+            assert false;
+        } catch (UnavailableException e) {
+            // this is good
+        } finally {
+            failure.resolve();
+        }
+    }
+
+    protected void insert(Cassandra.Client client, ByteBuffer key, String cf, 
String name, String value, long timestamp, ConsistencyLevel cl)
+        throws InvalidRequestException, UnavailableException, 
TimedOutException, TException
+    {
+        Column col = new Column(
+             ByteBuffer.wrap(name.getBytes()),
+             ByteBuffer.wrap(value.getBytes()),
+             timestamp
+             );
+        client.insert(key, new ColumnParent(cf), col, cl);
+    }
+
+    protected Column getColumn(Cassandra.Client client, ByteBuffer key, String 
cf, String col, ConsistencyLevel cl)
+        throws InvalidRequestException, UnavailableException, 
TimedOutException, TException, NotFoundException
+    {
+        ColumnPath cpath = new ColumnPath(cf);
+        cpath.setColumn(col.getBytes());
+        return client.get(key, cpath, cl).column;
+    }
+
+    protected List<ColumnOrSuperColumn> get_slice(Cassandra.Client client, 
ByteBuffer key, String cf, ConsistencyLevel cl)
+      throws InvalidRequestException, UnavailableException, TimedOutException, 
TException
+    {
+        SlicePredicate sp = new SlicePredicate();
+        sp.setSlice_range(
+            new SliceRange(
+                ByteBuffer.wrap(new byte[0]),
+                ByteBuffer.wrap(new byte[0]),
+                false,
+                1000
+                )
+            );
+        return client.get_slice(key, new ColumnParent(cf), sp, cl);
+    }
+
+    protected void assertColumnEqual(String name, String value, long 
timestamp, Column col)
+    {
+        assertEquals(ByteBuffer.wrap(name.getBytes()), col.name);
+        assertEquals(ByteBuffer.wrap(value.getBytes()), col.value);
+        assertEquals(timestamp, col.timestamp);
+    }
+
+    protected List<InetAddress> endpointsForKey(InetAddress seed, ByteBuffer 
key, String keyspace)
+        throws IOException
+    {
+        RingCache ring = new RingCache(keyspace, new RandomPartitioner(), 
seed.getHostAddress(), 9160);
+        List<InetAddress> privateendpoints = ring.getEndpoint(key);
+        List<InetAddress> endpoints = new ArrayList<InetAddress>();
+        for (InetAddress endpoint : privateendpoints)
+        {
+            endpoints.add(controller.getPublicHost(endpoint));
+        }
+        return endpoints;
+    }
+
+    protected InetAddress nonEndpointForKey(InetAddress seed, ByteBuffer key, 
String keyspace)
+        throws IOException
+    {
+        List<InetAddress> endpoints = endpointsForKey(seed, key, keyspace);
+        for (InetAddress host : controller.getHosts())
+        {
+            if (!endpoints.contains(host))
+            {
+                return host;
+            }
+        }
+        return null;
+    }
+
+    protected ByteBuffer newKey()
+    {
+        return ByteBuffer.wrap(String.format("test.key.%d", 
System.currentTimeMillis()).getBytes());
+    }
+}

Added: 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/TestBase.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/TestBase.java?rev=1055618&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/TestBase.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/TestBase.java
 Wed Jan  5 20:16:14 2011
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.InetAddress;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.thrift.TException;
+
+import org.apache.cassandra.thrift.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+
+public abstract class TestBase
+{
+    protected static CassandraServiceController controller =
+        CassandraServiceController.getInstance();
+
+    protected static void addKeyspace(String name, int rf) throws Exception
+    {
+        List<CfDef> cfDefList = new LinkedList<CfDef>();
+
+        CfDef standard1 = new CfDef(name, "Standard1");
+        standard1.setComparator_type("BytesType");
+        standard1.setKey_cache_size(10000);
+        standard1.setRow_cache_size(1000);
+        standard1.setRow_cache_save_period_in_seconds(0);
+        standard1.setKey_cache_save_period_in_seconds(3600);
+        standard1.setMemtable_flush_after_mins(59);
+        standard1.setMemtable_throughput_in_mb(255);
+        standard1.setMemtable_operations_in_millions(0.29);
+        cfDefList.add(standard1);
+
+        List<InetAddress> hosts = controller.getHosts();
+        Cassandra.Client client = controller.createClient(hosts.get(0));
+
+        client.system_add_keyspace(
+            new KsDef(
+                name,
+                "org.apache.cassandra.locator.SimpleStrategy",
+                rf,
+                cfDefList));
+
+        // poll, until KS added
+        for (InetAddress host : hosts)
+        {
+            try
+            {
+                client = controller.createClient(host);
+                poll:
+                while (true)
+                {
+                    List<KsDef> ksDefList = client.describe_keyspaces();
+                    for (KsDef ks : ksDefList)
+                    {
+                        if (ks.name.equals(name))
+                            break poll;
+                    }
+
+                    try
+                    {
+                        Thread.sleep(1000);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        break poll;
+                    }
+                }
+            }
+            catch (TException te)
+            {
+                continue;
+            }
+        }
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception
+    {
+        controller.ensureClusterRunning();
+    }
+
+    protected static String createTemporaryKey()
+    {
+        return String.format("test.key.%d", System.currentTimeMillis());
+    }
+}

Added: 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/BlobUtils.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/BlobUtils.java?rev=1055618&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/BlobUtils.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/BlobUtils.java
 Wed Jan  5 20:16:14 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.File;
+import java.net.URI;
+
+import org.apache.commons.configuration.Configuration;
+
+import org.apache.whirr.service.ClusterSpec;
+
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.BlobStoreContextFactory;
+import org.jclouds.blobstore.domain.BlobMetadata;
+import org.jclouds.blobstore.InputStreamMap;
+
+import org.jclouds.aws.s3.S3Client;
+import org.jclouds.aws.s3.S3AsyncClient;
+import org.jclouds.aws.s3.domain.AccessControlList;
+import org.jclouds.aws.s3.domain.CannedAccessPolicy;
+
+import org.jclouds.rest.RestContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class BlobUtils
+{
+    private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class);
+
+    public static final String BLOB_PROVIDER = "whirr.blobstore.provider";
+    public static final String BLOB_CONTAINER = "whirr.blobstore.container";
+    
+    private static BlobStoreContext getContext(Configuration config, 
ClusterSpec spec)
+    {
+        return new 
BlobStoreContextFactory().createContext(getProvider(config), 
spec.getIdentity(), spec.getCredential());
+    }
+
+    private static String getProvider(Configuration config)
+    {
+        String provider = config.getString(BLOB_PROVIDER, null);
+        if (provider == null)
+            throw new RuntimeException("Please set " + BLOB_PROVIDER + " to a 
jclouds supported provider.");
+        return provider;
+    }
+
+    private static String getContainer(Configuration config)
+    {
+        String container = config.getString(BLOB_CONTAINER, null);
+        if (container == null)
+            throw new RuntimeException("Please set " + BLOB_CONTAINER + " to 
an existing container for your chosen provider.");
+        return container;
+    }
+
+    /**
+     * Stores the given local file as a public blob, and returns metadata for 
the blob.
+     */
+    public static Pair<BlobMetadata,URI> storeBlob(Configuration config, 
ClusterSpec spec, String filename)
+    {
+        File file = new File(filename);
+        String container = getContainer(config);
+        String provider = getProvider(config);
+        String blobName = System.nanoTime() + "/" + file.getName();
+        BlobStoreContext context = getContext(config, spec);
+        try
+        {
+            InputStreamMap map = context.createInputStreamMap(container);
+            map.putFile(blobName, file);
+            // TODO: magic! in order to expose the blob as public, we need to 
dive into provider specific APIs
+            // the hope is that permissions are encapsulated in jclouds in the 
future
+            if (provider.equals("s3"))
+            {
+                S3Client sss = 
context.<S3Client,S3AsyncClient>getProviderSpecificContext().getApi();
+                String ownerId = sss.getObjectACL(container, 
blobName).getOwner().getId();
+                sss.putObjectACL(container,
+                                 blobName,
+                                 
AccessControlList.fromCannedAccessPolicy(CannedAccessPolicy.PUBLIC_READ, 
ownerId));
+            }
+            else
+                LOG.warn(provider + " may not be properly supported for 
tarball transfer.");
+            // resolve the full URI of the blob (see 
http://code.google.com/p/jclouds/issues/detail?id=431)
+            BlobMetadata blob = context.getBlobStore().blobMetadata(container, 
blobName);
+            URI uri = 
context.getProviderSpecificContext().getEndpoint().resolve("/" + container + 
"/" + blob.getName());
+            return new Pair<BlobMetadata, URI>(blob, uri);
+        }
+        finally
+        {
+            context.close();
+        }
+    }
+
+    public static void deleteBlob(Configuration config, ClusterSpec spec, 
BlobMetadata blob)
+    {
+        String container = getContainer(config);
+        BlobStoreContext context = getContext(config, spec);
+        try
+        {
+            context.getBlobStore().removeBlob(container, blob.getName());
+        }
+        finally
+        {
+            context.close();
+        }
+    }
+}

Added: 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/KeyPair.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/KeyPair.java?rev=1055618&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/KeyPair.java
 (added)
+++ 
cassandra/branches/cassandra-0.7/test/distributed/org/apache/cassandra/utils/KeyPair.java
 Wed Jan  5 20:16:14 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+
+/**
+ * A convenience class for generating an RSA key pair.
+ */
+public class KeyPair {
+
+  /**
+   * return a "public" -> rsa public key, "private" -> its corresponding
+   *   private key
+   */
+  public static Map<String,String> generate() throws JSchException {
+    com.jcraft.jsch.KeyPair pair = com.jcraft.jsch.KeyPair.genKeyPair(
+        new JSch(),  com.jcraft.jsch.KeyPair.RSA);
+    ByteArrayOutputStream publicKeyOut = new ByteArrayOutputStream();
+    ByteArrayOutputStream privateKeyOut = new ByteArrayOutputStream();
+    pair.writePublicKey(publicKeyOut, "whirr");
+    pair.writePrivateKey(privateKeyOut);
+    String publicKey = new String(publicKeyOut.toByteArray());
+    String privateKey = new String(privateKeyOut.toByteArray());
+    return ImmutableMap.<String, String> of("public", publicKey,
+        "private", privateKey);
+  }
+}

Added: cassandra/branches/cassandra-0.7/test/resources/whirr-default.properties
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/resources/whirr-default.properties?rev=1055618&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/resources/whirr-default.properties 
(added)
+++ cassandra/branches/cassandra-0.7/test/resources/whirr-default.properties 
Wed Jan  5 20:16:14 2011
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+whirr.service-name=cassandra
+whirr.cluster-name=cassandra_test
+whirr.instance-templates=4 cassandra
+whirr.version=0.3.0-incubating-SNAPSHOT


Reply via email to