Author: jbellis
Date: Thu Aug 12 18:32:12 2010
New Revision: 984904
URL: http://svn.apache.org/viewvc?rev=984904&view=rev
Log:
Remove references to DatabaseDescriptor from Pig, RingCache, and CFRW, and
remove it as a fallback from CFRR. cassandra.yaml is no longer needed by pig or
word_count.
patch by Stu Hood; reviewed by jbellis for CASSANDRA-1322
Removed:
cassandra/trunk/contrib/pig/cassandra.yaml
cassandra/trunk/contrib/word_count/cassandra.yaml
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/pig/README.txt
cassandra/trunk/contrib/pig/bin/pig_cassandra (contents, props changed)
cassandra/trunk/contrib/pig/build.xml
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Aug 12 18:32:12 2010
@@ -1,4 +1,9 @@
-0.7.0-beta1
+dev
+ * remove cassandra.yaml dependency from Hadoop and Pig (CASSADRA-1322)
+ * expose CfDef metadata in describe_keyspaces (CASSANDRA-1633)
+
+
+0.7-beta1
* sstable versioning (CASSANDRA-389)
* switched to slf4j logging (CASSANDRA-625)
* access levels for authentication/authorization (CASSANDRA-900)
Modified: cassandra/trunk/contrib/pig/README.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/README.txt?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/README.txt (original)
+++ cassandra/trunk/contrib/pig/README.txt Thu Aug 12 18:32:12 2010
@@ -3,11 +3,19 @@ A Pig LoadFunc that reads all columns fr
Setup:
First build and start a Cassandra server with the default
-configuration* and set the PIG_HOME and JAVA_HOME environment
+configuration and set the PIG_HOME and JAVA_HOME environment
variables to the location of a Pig >= 0.7.0 install and your Java
-install. If you would like to run using the Hadoop backend, you should
+install.
+
+If you would like to run using the Hadoop backend, you should
also set PIG_CONF_DIR to the location of your Hadoop config.
+FInally, set the following as environment variables (uppercase,
+underscored), or as Hadoop configuration variables (lowercase, dotted):
+* PIG_RPC_PORT or cassandra.thrift.port : the port thrift is listening on
+* PIG_INITIAL_ADDRESS or cassandra.thrift.address : initial address to connect
to
+* PIG_PARTITIONER or cassandra.partitioner.class : cluster partitioner
+
Run:
contrib/pig$ ant
@@ -32,6 +40,3 @@ grunt> namecounts = FOREACH namegroups G
grunt> orderednames = ORDER namecounts BY $0;
grunt> topnames = LIMIT orderednames 50;
grunt> dump topnames;
-
-*If you want to point Pig at a real cluster, modify the seed
-address in cassandra.yaml and re-run the build step.
Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
(empty)
Propchange: cassandra/trunk/contrib/pig/bin/pig_cassandra
------------------------------------------------------------------------------
svn:executable = *
Modified: cassandra/trunk/contrib/pig/build.xml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/build.xml (original)
+++ cassandra/trunk/contrib/pig/build.xml Thu Aug 12 18:32:12 2010
@@ -30,7 +30,7 @@
<property name="final.name" value="cassandra_loadfunc" />
<path id="pig.classpath">
- <fileset file="${env.PIG_HOME}/pig*core.jar" />
+ <fileset file="${env.PIG_HOME}/pig*.jar" />
<fileset dir="${cassandra.dir}/lib">
<include name="libthrift*.jar" />
</fileset>
Modified:
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
---
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Thu Aug 12 18:32:12 2010
@@ -46,6 +46,12 @@ import org.apache.pig.data.TupleFactory;
*/
public class CassandraStorage extends LoadFunc
{
+ // system environment variables that can be set to configure connection
info:
+ // alternatively, Hadoop JobConf variables can be set using keys from
ConfigHelper
+ public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+ public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+ public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+
private final static byte[] BOUND = new byte[0];
private final static int LIMIT = 1024;
@@ -135,6 +141,14 @@ public class CassandraStorage extends Lo
conf = job.getConfiguration();
ConfigHelper.setInputSlicePredicate(conf, predicate);
ConfigHelper.setInputColumnFamily(conf, ksname, cfname);
+
+ // check the environment for connection information
+ if (System.getenv(PIG_RPC_PORT) != null)
+ ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
+ if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+ ConfigHelper.setInitialAddress(conf,
System.getenv(PIG_INITIAL_ADDRESS));
+ if (System.getenv(PIG_PARTITIONER) != null)
+ ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Thu Aug
12 18:32:12 2010
@@ -19,8 +19,8 @@ package org.apache.cassandra.client;
import java.util.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
@@ -29,7 +29,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.TokenRange;
@@ -40,8 +39,8 @@ import org.apache.thrift.protocol.TBinar
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
/**
* A class for caching the ring map at the client. For usage example, see
@@ -51,22 +50,21 @@ public class RingCache
{
final private static Logger logger_ =
LoggerFactory.getLogger(RingCache.class);
- private Set<String> seeds_ = new HashSet<String>();
- final private int port_= DatabaseDescriptor.getRpcPort();
- final private static IPartitioner partitioner_ =
DatabaseDescriptor.getPartitioner();
+ private final Set<String> seeds_ = new HashSet<String>();
+ private final int port_;
+ private final IPartitioner partitioner_;
private final String keyspace;
- private TokenMetadata tokenMetadata;
- public RingCache(String keyspace) throws IOException
+ private Set<Range> rangeSet;
+ private Multimap<Range, InetAddress> rangeMap;
+
+ public RingCache(String keyspace, IPartitioner partitioner, String
addresses, int port) throws IOException
{
- for (InetAddress seed : DatabaseDescriptor.getSeeds())
- {
- seeds_.add(seed.getHostAddress());
- }
-
+ for (String seed : addresses.split(","))
+ seeds_.add(seed);
+ this.port_ = port;
this.keyspace = keyspace;
-
- DatabaseDescriptor.loadSchemas();
+ this.partitioner_ = partitioner;
refreshEndpointMap();
}
@@ -82,16 +80,17 @@ public class RingCache
socket.open();
List<TokenRange> ring = client.describe_ring(keyspace);
- BiMap<Token, InetAddress> tokenEndpointMap =
HashBiMap.create();
+ rangeMap = HashMultimap.create();
for (TokenRange range : ring)
{
- Token<?> token =
StorageService.getPartitioner().getTokenFactory().fromString(range.start_token);
+ Token<?> left =
partitioner_.getTokenFactory().fromString(range.start_token);
+ Token<?> right =
partitioner_.getTokenFactory().fromString(range.end_token);
String host = range.endpoints.get(0);
try
{
- tokenEndpointMap.put(token,
InetAddress.getByName(host));
+ rangeMap.put(new Range(left, right, partitioner_),
InetAddress.getByName(host));
}
catch (UnknownHostException e)
{
@@ -99,7 +98,7 @@ public class RingCache
}
}
- tokenMetadata = new TokenMetadata(tokenEndpointMap);
+ rangeSet = new HashSet(rangeMap.keySet());
break;
}
@@ -115,11 +114,17 @@ public class RingCache
}
}
- public List<InetAddress> getEndpoint(byte[] key)
+ public Collection<InetAddress> getEndpoint(byte[] key)
{
- if (tokenMetadata == null)
+ if (rangeSet == null)
throw new RuntimeException("Must refresh endpoints before looking
up a key.");
- AbstractReplicationStrategy strat =
StorageService.createReplicationStrategy(tokenMetadata, keyspace);
- return strat.getNaturalEndpoints(partitioner_.getToken(key));
+
+ // TODO: naive linear search of the token map
+ Token<?> t = partitioner_.getToken(key);
+ for (Range range : rangeSet)
+ if (range.contains(t))
+ return rangeMap.get(range);
+
+ throw new RuntimeException("Invalid token information returned by
describe_ring: " + rangeMap);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Thu Aug 12 18:32:12 2010
@@ -32,7 +32,6 @@ import org.apache.cassandra.auth.AllowAl
import org.apache.cassandra.auth.SimpleAuthenticator;
import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.clock.TimestampReconciler;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Thu Aug 12 18:32:12 2010
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Clock;
@@ -106,7 +105,10 @@ final class ColumnFamilyRecordWriter ext
{
this.context = context;
this.mutationsByEndpoint = new
HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
- this.ringCache = new
RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()));
+ this.ringCache = new
RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()),
+
ConfigHelper.getPartitioner(context.getConfiguration()),
+
ConfigHelper.getInitialAddress(context.getConfiguration()),
+
ConfigHelper.getRpcPort(context.getConfiguration()));
this.batchThreshold =
context.getConfiguration().getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD,
Long.MAX_VALUE);
}
@@ -120,10 +122,7 @@ final class ColumnFamilyRecordWriter ext
*/
protected InetAddress getEndpoint(byte[] key)
{
- List<InetAddress> endpoints = ringCache.getEndpoint(key);
- return endpoints != null && endpoints.size() > 0
- ? endpoints.get(0)
- : null;
+ return ringCache.getEndpoint(key).iterator().next();
}
/**
@@ -327,7 +326,7 @@ final class ColumnFamilyRecordWriter ext
TSocket socket = null;
try
{
- socket = new TSocket(endpoint.getHostName(),
DatabaseDescriptor.getRpcPort());
+ socket = new TSocket(endpoint.getHostName(),
ConfigHelper.getRpcPort(taskContext.getConfiguration()));
Cassandra.Client client =
ColumnFamilyOutputFormat.createAuthenticatedClient(socket, taskContext);
client.batch_mutate(mutations, ConsistencyLevel.ONE);
return null;
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Thu
Aug 12 18:32:12 2010
@@ -20,8 +20,7 @@ package org.apache.cassandra.hadoop;
*
*/
-
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.conf.Configuration;
@@ -32,6 +31,7 @@ import org.apache.thrift.protocol.TBinar
public class ConfigHelper
{
+ private static final String PARTITIONER_CONFIG =
"cassandra.partitioner.class";
private static final String INPUT_KEYSPACE_CONFIG =
"cassandra.input.keyspace";
private static final String OUTPUT_KEYSPACE_CONFIG =
"cassandra.output.keyspace";
private static final String INPUT_KEYSPACE_USERNAME_CONFIG =
"cassandra.input.keyspace.username";
@@ -95,21 +95,6 @@ public class ConfigHelper
}
/**
- * The address and port of a Cassandra node that Hadoop can contact over
Thrift
- * to learn more about the Cassandra cluster. Optional when
storage-conf.xml
- * is provided.
- *
- * @param conf
- * @param address
- * @param port
- */
- public static void setThriftContact(Configuration conf, String address,
int port)
- {
- conf.set(THRIFT_PORT, String.valueOf(port));
- conf.set(INITIAL_THRIFT_ADDRESS, address);
- }
-
- /**
* The number of rows to request with each get range slices request.
* Too big and you can either get timeouts when it takes Cassandra too
* long to fetch all the data. Too small and the performance
@@ -244,13 +229,31 @@ public class ConfigHelper
public static int getRpcPort(Configuration conf)
{
- String v = conf.get(THRIFT_PORT);
- return v == null ? DatabaseDescriptor.getRpcPort() :
Integer.valueOf(v);
+ return Integer.valueOf(conf.get(THRIFT_PORT));
+ }
+
+ public static void setRpcPort(Configuration conf, String port)
+ {
+ conf.set(THRIFT_PORT, port);
}
public static String getInitialAddress(Configuration conf)
{
- String v = conf.get(INITIAL_THRIFT_ADDRESS);
- return v == null ?
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress() : v;
+ return conf.get(INITIAL_THRIFT_ADDRESS);
+ }
+
+ public static void setInitialAddress(Configuration conf, String address)
+ {
+ conf.set(INITIAL_THRIFT_ADDRESS, address);
+ }
+
+ public static void setPartitioner(Configuration conf, String classname)
+ {
+ conf.set(PARTITIONER_CONFIG, classname);
+ }
+
+ public static IPartitioner getPartitioner(Configuration conf)
+ {
+ return FBUtilities.newPartitioner(conf.get(PARTITIONER_CONFIG));
}
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
Thu Aug 12 18:32:12 2010
@@ -19,23 +19,16 @@ package org.apache.cassandra.client;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collection;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.Clock;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.thrift.*;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
-import org.apache.cassandra.thrift.AuthenticationRequest;
-
/**
* Sample code that uses RingCache in the client.
*/
@@ -46,7 +39,8 @@ public class TestRingCache
public TestRingCache(String keyspace) throws IOException
{
- ringCache = new RingCache(keyspace);
+ String seed =
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
+ ringCache = new RingCache(keyspace,
DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort());
}
private void setup(String server, int port) throws Exception
@@ -97,14 +91,13 @@ public class TestRingCache
ColumnPath col = new
ColumnPath("Standard1").setSuper_column(null).setColumn("col1".getBytes());
ColumnParent parent = new
ColumnParent("Standard1").setSuper_column(null);
- List<InetAddress> endpoints = tester.ringCache.getEndpoint(row);
- String hosts="";
- for (int i = 0; i < endpoints.size(); i++)
- hosts = hosts + ((i > 0) ? "," : "") + endpoints.get(i);
- System.out.println("hosts with key " + new String(row) + " : " +
hosts + "; choose " + endpoints.get(0));
+ Collection<InetAddress> endpoints =
tester.ringCache.getEndpoint(row);
+ InetAddress firstEndpoint = endpoints.iterator().next();
+ System.out.printf("hosts with key %s : %s; choose %s%n",
+ new String(row), StringUtils.join(endpoints,
","), firstEndpoint);
// now, read the row back directly from the host owning the row
locally
- tester.setup(endpoints.get(0).getHostAddress(),
DatabaseDescriptor.getRpcPort());
+ tester.setup(firstEndpoint.getHostAddress(),
DatabaseDescriptor.getRpcPort());
tester.thriftClient.set_keyspace(keyspace);
Clock clock = new Clock();
clock.setTimestamp(1);