Author: jbellis
Date: Thu Aug 12 18:32:12 2010
New Revision: 984904

URL: http://svn.apache.org/viewvc?rev=984904&view=rev
Log:
Remove references to DatabaseDescriptor from Pig, RingCache, and CFRW, and 
remove it as a fallback from CFRR. cassandra.yaml is no longer needed by pig or 
word_count.
patch by Stu Hood; reviewed by jbellis for CASSANDRA-1322

Removed:
    cassandra/trunk/contrib/pig/cassandra.yaml
    cassandra/trunk/contrib/word_count/cassandra.yaml
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/pig/README.txt
    cassandra/trunk/contrib/pig/bin/pig_cassandra   (contents, props changed)
    cassandra/trunk/contrib/pig/build.xml
    
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
    cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Aug 12 18:32:12 2010
@@ -1,4 +1,9 @@
-0.7.0-beta1
+dev
+ * remove cassandra.yaml dependency from Hadoop and Pig (CASSADRA-1322)
+ * expose CfDef metadata in describe_keyspaces (CASSANDRA-1633)
+
+
+0.7-beta1
  * sstable versioning (CASSANDRA-389)
  * switched to slf4j logging (CASSANDRA-625)
  * access levels for authentication/authorization (CASSANDRA-900)

Modified: cassandra/trunk/contrib/pig/README.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/README.txt?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/README.txt (original)
+++ cassandra/trunk/contrib/pig/README.txt Thu Aug 12 18:32:12 2010
@@ -3,11 +3,19 @@ A Pig LoadFunc that reads all columns fr
 Setup:
 
 First build and start a Cassandra server with the default
-configuration* and set the PIG_HOME and JAVA_HOME environment
+configuration and set the PIG_HOME and JAVA_HOME environment
 variables to the location of a Pig >= 0.7.0 install and your Java
-install. If you would like to run using the Hadoop backend, you should
+install. 
+
+If you would like to run using the Hadoop backend, you should
 also set PIG_CONF_DIR to the location of your Hadoop config.
 
+FInally, set the following as environment variables (uppercase,
+underscored), or as Hadoop configuration variables (lowercase, dotted):
+* PIG_RPC_PORT or cassandra.thrift.port : the port thrift is listening on 
+* PIG_INITIAL_ADDRESS or cassandra.thrift.address : initial address to connect 
to
+* PIG_PARTITIONER or cassandra.partitioner.class : cluster partitioner
+
 Run:
 
 contrib/pig$ ant
@@ -32,6 +40,3 @@ grunt> namecounts = FOREACH namegroups G
 grunt> orderednames = ORDER namecounts BY $0;
 grunt> topnames = LIMIT orderednames 50;
 grunt> dump topnames;
-
-*If you want to point Pig at a real cluster, modify the seed
-address in cassandra.yaml and re-run the build step.

Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
    (empty)

Propchange: cassandra/trunk/contrib/pig/bin/pig_cassandra
------------------------------------------------------------------------------
    svn:executable = *

Modified: cassandra/trunk/contrib/pig/build.xml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/build.xml (original)
+++ cassandra/trunk/contrib/pig/build.xml Thu Aug 12 18:32:12 2010
@@ -30,7 +30,7 @@
     <property name="final.name" value="cassandra_loadfunc" />
 
     <path id="pig.classpath">
-        <fileset file="${env.PIG_HOME}/pig*core.jar" />
+        <fileset file="${env.PIG_HOME}/pig*.jar" />
         <fileset dir="${cassandra.dir}/lib">
             <include name="libthrift*.jar" />
         </fileset>

Modified: 
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- 
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 (original)
+++ 
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 Thu Aug 12 18:32:12 2010
@@ -46,6 +46,12 @@ import org.apache.pig.data.TupleFactory;
  */
 public class CassandraStorage extends LoadFunc
 {
+    // system environment variables that can be set to configure connection 
info:
+    // alternatively, Hadoop JobConf variables can be set using keys from 
ConfigHelper
+    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+
     private final static byte[] BOUND = new byte[0];
     private final static int LIMIT = 1024;
 
@@ -135,6 +141,14 @@ public class CassandraStorage extends Lo
         conf = job.getConfiguration();
         ConfigHelper.setInputSlicePredicate(conf, predicate);
         ConfigHelper.setInputColumnFamily(conf, ksname, cfname);
+
+        // check the environment for connection information
+        if (System.getenv(PIG_RPC_PORT) != null)
+            ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
+        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+            ConfigHelper.setInitialAddress(conf, 
System.getenv(PIG_INITIAL_ADDRESS));
+        if (System.getenv(PIG_PARTITIONER) != null)
+            ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Thu Aug 
12 18:32:12 2010
@@ -19,8 +19,8 @@ package org.apache.cassandra.client;
 
 import java.util.*;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -29,7 +29,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.TokenRange;
@@ -40,8 +39,8 @@ import org.apache.thrift.protocol.TBinar
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
 
 /**
  *  A class for caching the ring map at the client. For usage example, see
@@ -51,22 +50,21 @@ public class RingCache
 {
     final private static Logger logger_ = 
LoggerFactory.getLogger(RingCache.class);
 
-    private Set<String> seeds_ = new HashSet<String>();
-    final private int port_= DatabaseDescriptor.getRpcPort();
-    final private static IPartitioner partitioner_ = 
DatabaseDescriptor.getPartitioner();
+    private final Set<String> seeds_ = new HashSet<String>();
+    private final int port_;
+    private final IPartitioner partitioner_;
     private final String keyspace;
-    private TokenMetadata tokenMetadata;
 
-    public RingCache(String keyspace) throws IOException
+    private Set<Range> rangeSet;
+    private Multimap<Range, InetAddress> rangeMap;
+
+    public RingCache(String keyspace, IPartitioner partitioner, String 
addresses, int port) throws IOException
     {
-        for (InetAddress seed : DatabaseDescriptor.getSeeds())
-        {
-            seeds_.add(seed.getHostAddress());
-        }
-        
+        for (String seed : addresses.split(","))
+            seeds_.add(seed);
+        this.port_ = port;
         this.keyspace = keyspace;
-        
-        DatabaseDescriptor.loadSchemas();
+        this.partitioner_ = partitioner;
         refreshEndpointMap();
     }
 
@@ -82,16 +80,17 @@ public class RingCache
                 socket.open();
 
                 List<TokenRange> ring = client.describe_ring(keyspace);
-                BiMap<Token, InetAddress> tokenEndpointMap = 
HashBiMap.create();
+                rangeMap = HashMultimap.create();
                 
                 for (TokenRange range : ring)
                 {
-                    Token<?> token = 
StorageService.getPartitioner().getTokenFactory().fromString(range.start_token);
+                    Token<?> left = 
partitioner_.getTokenFactory().fromString(range.start_token);
+                    Token<?> right = 
partitioner_.getTokenFactory().fromString(range.end_token);
                     String host = range.endpoints.get(0);
                     
                     try
                     {
-                        tokenEndpointMap.put(token, 
InetAddress.getByName(host));
+                        rangeMap.put(new Range(left, right, partitioner_), 
InetAddress.getByName(host));
                     }
                     catch (UnknownHostException e)
                     {
@@ -99,7 +98,7 @@ public class RingCache
                     }
                 }
 
-                tokenMetadata = new TokenMetadata(tokenEndpointMap);
+                rangeSet = new HashSet(rangeMap.keySet());
 
                 break;
             }
@@ -115,11 +114,17 @@ public class RingCache
         }
     }
 
-    public List<InetAddress> getEndpoint(byte[] key)
+    public Collection<InetAddress> getEndpoint(byte[] key)
     {
-        if (tokenMetadata == null)
+        if (rangeSet == null)
             throw new RuntimeException("Must refresh endpoints before looking 
up a key.");
-        AbstractReplicationStrategy strat = 
StorageService.createReplicationStrategy(tokenMetadata, keyspace);
-        return strat.getNaturalEndpoints(partitioner_.getToken(key));
+
+        // TODO: naive linear search of the token map
+        Token<?> t = partitioner_.getToken(key);
+        for (Range range : rangeSet)
+            if (range.contains(t))
+                return rangeMap.get(range);
+
+        throw new RuntimeException("Invalid token information returned by 
describe_ring: " + rangeMap);
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
 Thu Aug 12 18:32:12 2010
@@ -32,7 +32,6 @@ import org.apache.cassandra.auth.AllowAl
 import org.apache.cassandra.auth.SimpleAuthenticator;
 
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.clock.TimestampReconciler;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
 Thu Aug 12 18:32:12 2010
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Clock;
@@ -106,7 +105,10 @@ final class ColumnFamilyRecordWriter ext
     {
         this.context = context;
         this.mutationsByEndpoint = new 
HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
-        this.ringCache = new 
RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()));
+        this.ringCache = new 
RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()),
+                                       
ConfigHelper.getPartitioner(context.getConfiguration()),
+                                       
ConfigHelper.getInitialAddress(context.getConfiguration()),
+                                       
ConfigHelper.getRpcPort(context.getConfiguration()));
         this.batchThreshold = 
context.getConfiguration().getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 
Long.MAX_VALUE);
     }
     
@@ -120,10 +122,7 @@ final class ColumnFamilyRecordWriter ext
      */
     protected InetAddress getEndpoint(byte[] key)
     {
-        List<InetAddress> endpoints = ringCache.getEndpoint(key);
-        return endpoints != null && endpoints.size() > 0
-               ? endpoints.get(0)
-               : null;
+        return ringCache.getEndpoint(key).iterator().next();
     }
 
     /**
@@ -327,7 +326,7 @@ final class ColumnFamilyRecordWriter ext
             TSocket socket = null;
             try
             {
-                socket = new TSocket(endpoint.getHostName(), 
DatabaseDescriptor.getRpcPort());
+                socket = new TSocket(endpoint.getHostName(), 
ConfigHelper.getRpcPort(taskContext.getConfiguration()));
                 Cassandra.Client client = 
ColumnFamilyOutputFormat.createAuthenticatedClient(socket, taskContext);
                 client.batch_mutate(mutations, ConsistencyLevel.ONE);
                 return null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Thu 
Aug 12 18:32:12 2010
@@ -20,8 +20,7 @@ package org.apache.cassandra.hadoop;
  * 
  */
 
-
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +31,7 @@ import org.apache.thrift.protocol.TBinar
 
 public class ConfigHelper
 {
+    private static final String PARTITIONER_CONFIG = 
"cassandra.partitioner.class";
     private static final String INPUT_KEYSPACE_CONFIG = 
"cassandra.input.keyspace";
     private static final String OUTPUT_KEYSPACE_CONFIG = 
"cassandra.output.keyspace";
     private static final String INPUT_KEYSPACE_USERNAME_CONFIG = 
"cassandra.input.keyspace.username";
@@ -95,21 +95,6 @@ public class ConfigHelper
     }
 
     /**
-     * The address and port of a Cassandra node that Hadoop can contact over 
Thrift
-     * to learn more about the Cassandra cluster.  Optional when 
storage-conf.xml
-     * is provided.
-     *
-     * @param conf
-     * @param address
-     * @param port
-     */
-    public static void setThriftContact(Configuration conf, String address, 
int port)
-    {
-        conf.set(THRIFT_PORT, String.valueOf(port));
-        conf.set(INITIAL_THRIFT_ADDRESS, address);
-    }
-
-    /**
      * The number of rows to request with each get range slices request.
      * Too big and you can either get timeouts when it takes Cassandra too
      * long to fetch all the data. Too small and the performance
@@ -244,13 +229,31 @@ public class ConfigHelper
 
     public static int getRpcPort(Configuration conf)
     {
-        String v = conf.get(THRIFT_PORT);
-        return v == null ? DatabaseDescriptor.getRpcPort() : 
Integer.valueOf(v);
+        return Integer.valueOf(conf.get(THRIFT_PORT));
+    }
+
+    public static void setRpcPort(Configuration conf, String port)
+    {
+        conf.set(THRIFT_PORT, port);
     }
 
     public static String getInitialAddress(Configuration conf)
     {
-        String v = conf.get(INITIAL_THRIFT_ADDRESS);
-        return v == null ? 
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v;
+        return conf.get(INITIAL_THRIFT_ADDRESS);
+    }
+
+    public static void setInitialAddress(Configuration conf, String address)
+    {
+        conf.set(INITIAL_THRIFT_ADDRESS, address);
+    }
+
+    public static void setPartitioner(Configuration conf, String classname)
+    {
+        conf.set(PARTITIONER_CONFIG, classname); 
+    }
+
+    public static IPartitioner getPartitioner(Configuration conf)
+    {
+        return FBUtilities.newPartitioner(conf.get(PARTITIONER_CONFIG)); 
     }
 }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java 
Thu Aug 12 18:32:12 2010
@@ -19,23 +19,16 @@ package org.apache.cassandra.client;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collection;
 
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.Clock;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.thrift.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 
-import org.apache.cassandra.thrift.AuthenticationRequest;
-
 /**
  *  Sample code that uses RingCache in the client.
  */
@@ -46,7 +39,8 @@ public class TestRingCache
 
     public TestRingCache(String keyspace) throws IOException
     {
-       ringCache = new RingCache(keyspace);
+        String seed = 
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
+       ringCache = new RingCache(keyspace, 
DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort());
     }
     
     private void setup(String server, int port) throws Exception
@@ -97,14 +91,13 @@ public class TestRingCache
             ColumnPath col = new 
ColumnPath("Standard1").setSuper_column(null).setColumn("col1".getBytes());
             ColumnParent parent = new 
ColumnParent("Standard1").setSuper_column(null);
 
-            List<InetAddress> endpoints = tester.ringCache.getEndpoint(row);
-            String hosts="";
-            for (int i = 0; i < endpoints.size(); i++)
-                hosts = hosts + ((i > 0) ? "," : "") + endpoints.get(i);
-            System.out.println("hosts with key " + new String(row) + " : " + 
hosts + "; choose " + endpoints.get(0));
+            Collection<InetAddress> endpoints = 
tester.ringCache.getEndpoint(row);
+            InetAddress firstEndpoint = endpoints.iterator().next();
+            System.out.printf("hosts with key %s : %s; choose %s%n",
+                              new String(row), StringUtils.join(endpoints, 
","), firstEndpoint);
 
             // now, read the row back directly from the host owning the row 
locally
-            tester.setup(endpoints.get(0).getHostAddress(), 
DatabaseDescriptor.getRpcPort());
+            tester.setup(firstEndpoint.getHostAddress(), 
DatabaseDescriptor.getRpcPort());
             tester.thriftClient.set_keyspace(keyspace);
             Clock clock = new Clock();
             clock.setTimestamp(1);


Reply via email to