Author: jbellis
Date: Wed Oct 19 04:22:58 2011
New Revision: 1185965
URL: http://svn.apache.org/viewvc?rev=1185965&view=rev
Log:
merge from 0.8
Modified:
cassandra/branches/cassandra-1.0/ (props changed)
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/contrib/ (props changed)
cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java
cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java
Propchange: cassandra/branches/cassandra-1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 19 04:22:58 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7:1026516-1183000
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1185761,1185960-1185961,1185963
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/branches/cassandra-1.0:1167106,1167185
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Wed Oct 19 04:22:58 2011
@@ -18,6 +18,17 @@
* Fix CLI `show schema` to include "compression_options" (CASSANDRA-3368)
* Snapshot to include manifest under LeveledCompactionStrategy
(CASSANDRA-3359)
* (CQL) SELECT query should allow CF name to be qualified by keyspace
(CASSANDRA-3130)
+ * Display CLI version string on startup (CASSANDRA-3196)
+ * (Hadoop) make CFIF try rpc_address or fallback to listen_address
+ (CASSANDRA-3214)
+ * (Hadoop) accept comma delimited lists of initial thrift connections
+ (CASSANDRA-3185)
+ * ColumnFamily min_compaction_threshold should be >= 2 (CASSANDRA-3342)
+ * (Pig) add 0.8+ types and key validation type in schema (CASSANDRA-3280)
+ * Fix completely removing column metadata using CLI (CASSANDRA-3126)
+ * (CQL) Fix internal application error specifying 'using consistency ...'
+ in lower case (CASSANDRA-3366)
+
1.0.0-final
* close scrubbed sstable fd before deleting it (CASSANDRA-3318)
Propchange: cassandra/branches/cassandra-1.0/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 19 04:22:58 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
/cassandra/branches/cassandra-0.7/contrib:1026516-1183000
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1185761,1185960-1185961,1185963
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/branches/cassandra-1.0/contrib:1167106,1167185
Modified:
cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++
cassandra/branches/cassandra-1.0/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Wed Oct 19 04:22:58 2011
@@ -107,7 +107,7 @@ public class CassandraStorage extends Lo
return limit;
}
- @Override
+ @Override
public Tuple getNext() throws IOException
{
try
@@ -122,7 +122,7 @@ public class CassandraStorage extends Lo
assert key != null && cf != null;
// and wrap it in a tuple
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
ArrayList<Tuple> columns = new ArrayList<Tuple>();
tuple.set(0, new DataByteArray(key.array(),
key.position()+key.arrayOffset(), key.limit()+key.arrayOffset()));
for (Map.Entry<ByteBuffer, IColumn> entry : cf.entrySet())
@@ -187,10 +187,12 @@ public class CassandraStorage extends Lo
ArrayList<AbstractType> marshallers = new ArrayList<AbstractType>();
AbstractType comparator = null;
AbstractType default_validator = null;
+ AbstractType key_validator = null;
try
{
- comparator = TypeParser.parse(cfDef.comparator_type);
- default_validator =
TypeParser.parse(cfDef.default_validation_class);
+ comparator = TypeParser.parse(cfDef.getComparator_type());
+ default_validator =
TypeParser.parse(cfDef.getDefault_validation_class());
+ key_validator = TypeParser.parse(cfDef.getKey_validation_class());
}
catch (ConfigurationException e)
{
@@ -199,13 +201,14 @@ public class CassandraStorage extends Lo
marshallers.add(comparator);
marshallers.add(default_validator);
+ marshallers.add(key_validator);
return marshallers;
}
- private Map<ByteBuffer,AbstractType> getValidatorMap(CfDef cfDef) throws
IOException
+ private Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws
IOException
{
Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer,
AbstractType>();
- for (ColumnDef cd : cfDef.column_metadata)
+ for (ColumnDef cd : cfDef.getColumn_metadata())
{
if (cd.getValidation_class() != null &&
!cd.getValidation_class().isEmpty())
{
@@ -236,6 +239,18 @@ public class CassandraStorage extends Lo
this.reader = reader;
}
+ public static Map<String, String> getQueryMap(String query)
+ {
+ String[] params = query.split("&");
+ Map<String, String> map = new HashMap<String, String>();
+ for (String param : params)
+ {
+ String[] keyValue = param.split("=");
+ map.put(keyValue[0], keyValue[1]);
+ }
+ return map;
+ }
+
private void setLocationFromUri(String location) throws IOException
{
// parse uri into keyspace and columnfamily
@@ -247,18 +262,18 @@ public class CassandraStorage extends Lo
String[] urlParts = location.split("\\?");
if (urlParts.length > 1)
{
- for (String param : urlParts[1].split("&"))
- {
- String[] pair = param.split("=");
- if (pair[0].equals("slice_start"))
- slice_start = ByteBufferUtil.bytes(pair[1]);
- else if (pair[0].equals("slice_end"))
- slice_end = ByteBufferUtil.bytes(pair[1]);
- else if (pair[0].equals("reversed"))
- slice_reverse = Boolean.parseBoolean(pair[1]);
- else if (pair[0].equals("limit"))
- limit = Integer.parseInt(pair[1]);
- }
+ Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+ AbstractType comparator = BytesType.instance;
+ if (urlQuery.containsKey("comparator"))
+ comparator = TypeParser.parse(urlQuery.get("comparator"));
+ if (urlQuery.containsKey("slice_start"))
+ slice_start =
comparator.fromString(urlQuery.get("slice_start"));
+ if (urlQuery.containsKey("slice_end"))
+ slice_end =
comparator.fromString(urlQuery.get("slice_end"));
+ if (urlQuery.containsKey("reversed"))
+ slice_reverse =
Boolean.parseBoolean(urlQuery.get("reversed"));
+ if (urlQuery.containsKey("limit"))
+ limit = Integer.parseInt(urlQuery.get("limit"));
}
String[] parts = urlParts[0].split("/+");
keyspace = parts[1];
@@ -312,10 +327,14 @@ public class CassandraStorage extends Lo
// top-level schema, no type
ResourceSchema schema = new ResourceSchema();
+ // get default marshallers and validators
+ List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+ Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+
// add key
ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
keyFieldSchema.setName("key");
- keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
+ keyFieldSchema.setType(getPigType(marshallers.get(2)));
// will become the bag of tuples
ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
@@ -323,9 +342,6 @@ public class CassandraStorage extends Lo
bagFieldSchema.setType(DataType.BAG);
ResourceSchema bagSchema = new ResourceSchema();
-
- List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
- Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
List<ResourceFieldSchema> tupleFields = new
ArrayList<ResourceFieldSchema>();
// default comparator/validator
@@ -381,6 +397,10 @@ public class CassandraStorage extends Lo
return DataType.CHARARRAY;
else if (type instanceof UTF8Type)
return DataType.CHARARRAY;
+ else if (type instanceof FloatType)
+ return DataType.FLOAT;
+ else if (type instanceof DoubleType)
+ return DataType.DOUBLE;
return DataType.BYTEARRAY;
}
@@ -545,7 +565,7 @@ public class CassandraStorage extends Lo
Cassandra.Client client = null;
try
{
- client =
createConnection(ConfigHelper.getInitialAddress(conf),
ConfigHelper.getRpcPort(conf), true);
+ client = ConfigHelper.getClientFromAddressList(conf);
CfDef cfDef = null;
client.set_keyspace(keyspace);
KsDef ksDef = client.describe_keyspace(keyspace);
@@ -579,21 +599,6 @@ public class CassandraStorage extends Lo
}
}
- private static Cassandra.Client createConnection(String host, Integer
port, boolean framed) throws IOException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = framed ? new TFramedTransport(socket) : socket;
- try
- {
- trans.open();
- }
- catch (TTransportException e)
- {
- throw new IOException("unable to connect to server", e);
- }
- return new Cassandra.Client(new TBinaryProtocol(trans));
- }
-
private static String cfdefToString(CfDef cfDef)
{
assert cfDef != null;
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 19 04:22:58 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167106,1167185
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 19 04:22:58 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167106,1167185
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 19 04:22:58 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167106,1167185
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 19 04:22:58 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167106,1167185
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 19 04:22:58 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1183000
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1183002,1183241,1185761,1185960-1185961,1185963
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1185761,1185960-1185961,1185963
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167106,1167185
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/Cli.g
Wed Oct 19 04:22:58 2011
@@ -401,8 +401,8 @@ attrValue
arrayConstruct
- : '[' (hashConstruct ','?)+ ']'
- -> ^(ARRAY (hashConstruct)+)
+ : '[' (hashConstruct ','?)* ']'
+ -> ^(ARRAY (hashConstruct)*)
;
hashConstruct
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cli/CliClient.java
Wed Oct 19 04:22:58 2011
@@ -189,6 +189,7 @@ public class CliClient
public void printBanner()
{
sessionState.out.println(getHelp().banner);
+ sessionState.out.println("Cassandra CLI version " +
FBUtilities.getReleaseVersionString());
}
// Execute a CLI Statement
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/client/RingCache.java
Wed Oct 19 04:22:58 2011
@@ -21,25 +21,22 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.TokenRange;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TException;
-import org.apache.cassandra.thrift.TBinaryProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
/**
* A class for caching the ring map at the client. For usage example, see
@@ -50,42 +47,32 @@ public class RingCache
{
final private static Logger logger_ =
LoggerFactory.getLogger(RingCache.class);
- private final Set<String> seeds_ = new HashSet<String>();
- private final int port_;
- private final IPartitioner<?> partitioner_;
- private final String keyspace;
+ private final IPartitioner<?> partitioner;
+ private final Configuration conf;
private Multimap<Range, InetAddress> rangeMap;
- public RingCache(String keyspace, IPartitioner<?> partitioner, String
addresses, int port) throws IOException
+ public RingCache(Configuration conf) throws IOException
{
- for (String seed : addresses.split(","))
- seeds_.add(seed);
- this.port_ = port;
- this.keyspace = keyspace;
- this.partitioner_ = partitioner;
+ this.conf = conf;
+ this.partitioner = ConfigHelper.getPartitioner(conf);
refreshEndpointMap();
}
public void refreshEndpointMap()
{
- for (String seed : seeds_)
- {
- try
- {
- TSocket socket = new TSocket(seed, port_);
- TBinaryProtocol binaryProtocol = new TBinaryProtocol(new
TFramedTransport(socket));
- Cassandra.Client client = new Cassandra.Client(binaryProtocol);
- socket.open();
+ try {
+
+ Cassandra.Client client =
ConfigHelper.getClientFromAddressList(conf);
- List<TokenRange> ring = client.describe_ring(keyspace);
+ List<TokenRange> ring =
client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
rangeMap = ArrayListMultimap.create();
for (TokenRange range : ring)
{
- Token<?> left =
partitioner_.getTokenFactory().fromString(range.start_token);
- Token<?> right =
partitioner_.getTokenFactory().fromString(range.end_token);
- Range r = new Range(left, right, partitioner_);
+ Token<?> left =
partitioner.getTokenFactory().fromString(range.start_token);
+ Token<?> right =
partitioner.getTokenFactory().fromString(range.end_token);
+ Range r = new Range(left, right, partitioner);
for (String host : range.endpoints)
{
try
@@ -98,19 +85,20 @@ public class RingCache
}
}
}
- break;
}
catch (InvalidRequestException e)
{
throw new RuntimeException(e);
}
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
catch (TException e)
{
- /* let the Exception go and try another seed. log this though
*/
- logger_.debug("Error contacting seed " + seed + " " +
e.getMessage());
+ logger_.debug("Error contacting seed list" +
ConfigHelper.getInitialAddress(conf) + " " + e.getMessage());
}
}
- }
/** ListMultimap promises to return a List for get(K) */
public List<InetAddress> getEndpoint(Range range)
@@ -126,7 +114,7 @@ public class RingCache
public Range getRange(ByteBuffer key)
{
// TODO: naive linear search of the token map
- Token<?> t = partitioner_.getToken(key);
+ Token<?> t = partitioner.getToken(key);
for (Range range : rangeMap.keySet())
if (range.contains(t))
return range;
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/cql/Cql.g
Wed Oct 19 04:22:58 2011
@@ -148,7 +148,7 @@ selectStatement returns [SelectStatement
| K_COUNT '(' s2=selectExpression ')' { expression = s2; isCountOp =
true; }
)
K_FROM (keyspace=(IDENT | STRING_LITERAL | INTEGER) '.')?
columnFamily=( IDENT | STRING_LITERAL | INTEGER )
- ( K_USING K_CONSISTENCY K_LEVEL { cLevel =
ConsistencyLevel.valueOf($K_LEVEL.text); } )?
+ ( K_USING K_CONSISTENCY K_LEVEL { cLevel =
ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase()); } )?
( K_WHERE whereClause )?
( K_LIMIT rows=INTEGER { numRecords = Integer.parseInt($rows.text);
} )?
endStmnt
@@ -231,7 +231,7 @@ usingClauseDelete[Attributes attrs]
;
usingClauseDeleteObjective[Attributes attrs]
- : K_CONSISTENCY K_LEVEL {
attrs.setConsistencyLevel(ConsistencyLevel.valueOf($K_LEVEL.text)); }
+ : K_CONSISTENCY K_LEVEL {
attrs.setConsistencyLevel(ConsistencyLevel.valueOf($K_LEVEL.text.toUpperCase()));
}
| K_TIMESTAMP ts=INTEGER { attrs.setTimestamp(Long.valueOf($ts.text)); }
;
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Wed Oct 19 04:22:58 2011
@@ -24,16 +24,17 @@ package org.apache.cassandra.hadoop;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
@@ -41,14 +42,16 @@ import org.apache.cassandra.thrift.Cassa
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.TokenRange;
-import org.apache.cassandra.thrift.TBinaryProtocol;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Hadoop InputFormat allowing map/reduce against Cassandra rows within one
ColumnFamily.
@@ -188,13 +191,17 @@ public class ColumnFamilyInputFormat ext
{
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
List<String> tokens = getSubSplits(keyspace, cfName, range, conf);
-
+ assert range.rpc_endpoints.size() == range.endpoints.size() :
"rpc_endpoints size must match endpoints size";
// turn the sub-ranges into InputSplits
String[] endpoints = range.endpoints.toArray(new
String[range.endpoints.size()]);
// hadoop needs hostname, not ip
- for (int i = 0; i < endpoints.length; i++)
+ int endpointIndex = 0;
+ for (String endpoint: range.rpc_endpoints)
{
- endpoints[i] =
InetAddress.getByName(endpoints[i]).getHostName();
+ String endpoint_address = endpoint;
+ if(endpoint_address == null || endpoint_address ==
"0.0.0.0")
+ endpoint_address =
range.endpoints.get(endpointIndex);
+ endpoints[endpointIndex++] =
InetAddress.getByName(endpoint_address).getHostName();
}
for (int i = 1; i < tokens.size(); i++)
@@ -210,11 +217,11 @@ public class ColumnFamilyInputFormat ext
private List<String> getSubSplits(String keyspace, String cfName,
TokenRange range, Configuration conf) throws IOException
{
int splitsize = ConfigHelper.getInputSplitSize(conf);
- for (String host : range.endpoints)
+ for (String host : range.rpc_endpoints)
{
try
{
- Cassandra.Client client = createConnection(host,
ConfigHelper.getRpcPort(conf), true);
+ Cassandra.Client client = ConfigHelper.createConnection(host,
ConfigHelper.getRpcPort(conf), true);
client.set_keyspace(keyspace);
return client.describe_splits(cfName, range.start_token,
range.end_token, splitsize);
}
@@ -234,47 +241,10 @@ public class ColumnFamilyInputFormat ext
throw new IOException("failed connecting to all endpoints " +
StringUtils.join(range.endpoints, ","));
}
- private static Cassandra.Client createConnection(String host, Integer
port, boolean framed) throws IOException
- {
- TSocket socket = new TSocket(host, port);
- TTransport trans = framed ? new TFramedTransport(socket) : socket;
- try
- {
- trans.open();
- }
- catch (TTransportException e)
- {
- throw new IOException("unable to connect to server", e);
- }
- return new Cassandra.Client(new TBinaryProtocol(trans));
- }
private List<TokenRange> getRangeMap(Configuration conf) throws IOException
{
- String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
- Cassandra.Client client = null;
- List<IOException> exceptions = new ArrayList<IOException>();
- for (String address : addresses)
- {
- try
- {
- client = createConnection(address,
ConfigHelper.getRpcPort(conf), true);
- break;
- }
- catch (IOException ioe)
- {
- exceptions.add(ioe);
- }
- }
- if (client == null)
- {
- logger.error("failed to connect to any initial addresses");
- for (IOException ioe : exceptions)
- {
- logger.error("", ioe);
- }
- throw exceptions.get(exceptions.size() - 1);
- }
+ Cassandra.Client client = ConfigHelper.getClientFromAddressList(conf);
List<TokenRange> map;
try
@@ -292,6 +262,8 @@ public class ColumnFamilyInputFormat ext
return map;
}
+
+
public RecordReader<ByteBuffer, SortedMap<ByteBuffer, IColumn>>
createRecordReader(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext) throws IOException, InterruptedException
{
return new ColumnFamilyRecordReader();
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Wed Oct 19 04:22:58 2011
@@ -95,10 +95,7 @@ implements org.apache.hadoop.mapred.Reco
ColumnFamilyRecordWriter(Configuration conf) throws IOException
{
this.conf = conf;
- this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf),
- ConfigHelper.getPartitioner(conf),
- ConfigHelper.getInitialAddress(conf),
- ConfigHelper.getRpcPort(conf));
+ this.ringCache = new RingCache(conf);
this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 *
Runtime.getRuntime().availableProcessors());
this.clients = new HashMap<Range,RangeClient>();
batchThreshold =
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
Wed Oct 19 04:22:58 2011
@@ -19,9 +19,13 @@ package org.apache.cassandra.hadoop;
* under the License.
*
*/
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.TBinaryProtocol;
@@ -31,6 +35,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class ConfigHelper
{
@@ -54,6 +65,9 @@ public class ConfigHelper
private static final String INITIAL_THRIFT_ADDRESS =
"cassandra.thrift.address";
private static final String READ_CONSISTENCY_LEVEL =
"cassandra.consistencylevel.read";
private static final String WRITE_CONSISTENCY_LEVEL =
"cassandra.consistencylevel.write";
+
+ private static final Logger logger =
LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+
/**
* Set the keyspace and column family for the input of this job.
@@ -331,4 +345,50 @@ public class ConfigHelper
throw new RuntimeException(e);
}
}
+
+
+ public static Cassandra.Client getClientFromAddressList(Configuration
conf) throws IOException
+ {
+ String[] addresses = ConfigHelper.getInitialAddress(conf).split(",");
+ Cassandra.Client client = null;
+ List<IOException> exceptions = new ArrayList<IOException>();
+ for (String address : addresses)
+ {
+ try
+ {
+ client = createConnection(address,
ConfigHelper.getRpcPort(conf), true);
+ break;
+ }
+ catch (IOException ioe)
+ {
+ exceptions.add(ioe);
+ }
+ }
+ if (client == null)
+ {
+ logger.error("failed to connect to any initial addresses");
+ for (IOException ioe : exceptions)
+ {
+ logger.error("", ioe);
+ }
+ throw exceptions.get(exceptions.size() - 1);
+ }
+ return client;
+ }
+
+ public static Cassandra.Client createConnection(String host, Integer port,
boolean framed)
+ throws IOException
+ {
+ TSocket socket = new TSocket(host, port);
+ TTransport trans = framed ? new TFramedTransport(socket) : socket;
+ try
+ {
+ trans.open();
+ }
+ catch (TTransportException e)
+ {
+ throw new IOException("unable to connect to server", e);
+ }
+ return new Cassandra.Client(new TBinaryProtocol(trans));
+ }
}
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/thrift/ThriftValidation.java
Wed Oct 19 04:22:58 2011
@@ -691,19 +691,11 @@ public class ThriftValidation
{
if (cf_def.isSetMin_compaction_threshold() &&
cf_def.isSetMax_compaction_threshold())
{
- if ((cf_def.min_compaction_threshold >
cf_def.max_compaction_threshold)
- && cf_def.max_compaction_threshold != 0)
- {
- throw new ConfigurationException("min_compaction_threshold
cannot be greater than max_compaction_threshold");
- }
+ validateMinCompactionThreshold(cf_def.min_compaction_threshold,
cf_def.max_compaction_threshold);
}
else if (cf_def.isSetMin_compaction_threshold())
{
- if (cf_def.min_compaction_threshold >
CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD)
- {
- throw new
ConfigurationException(String.format("min_compaction_threshold cannot be
greather than max_compaction_threshold (default %d)",
-
CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD));
- }
+ validateMinCompactionThreshold(cf_def.min_compaction_threshold,
CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD);
}
else if (cf_def.isSetMax_compaction_threshold())
{
@@ -718,6 +710,26 @@ public class ThriftValidation
}
}
+ public static void validateMinCompactionThreshold(int
min_compaction_threshold, int max_compaction_threshold) throws
ConfigurationException
+ {
+ if (min_compaction_threshold <= 1)
+ throw new ConfigurationException("min_compaction_threshold cannot
be less than 2.");
+
+ if (min_compaction_threshold > max_compaction_threshold &&
max_compaction_threshold != 0)
+ throw new
ConfigurationException(String.format("min_compaction_threshold cannot be
greater than max_compaction_threshold %d",
+
max_compaction_threshold));
+ }
+
+ public static void
validateMemtableSettings(org.apache.cassandra.thrift.CfDef cf_def) throws
ConfigurationException
+ {
+ if (cf_def.isSetMemtable_flush_after_mins())
+
DatabaseDescriptor.validateMemtableFlushPeriod(cf_def.memtable_flush_after_mins);
+ if (cf_def.isSetMemtable_throughput_in_mb())
+
DatabaseDescriptor.validateMemtableThroughput(cf_def.memtable_throughput_in_mb);
+ if (cf_def.isSetMemtable_operations_in_millions())
+
DatabaseDescriptor.validateMemtableOperations(cf_def.memtable_operations_in_millions);
+ }
+
public static void validateKeyspaceNotYetExisting(String newKsName) throws
InvalidRequestException
{
// keyspace names must be unique case-insensitively because the
keyspace name becomes the directory
Modified:
cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java
(original)
+++
cassandra/branches/cassandra-1.0/test/distributed/org/apache/cassandra/TestBase.java
Wed Oct 19 04:22:58 2011
@@ -301,6 +301,8 @@ public abstract class TestBase
protected List<InetAddress> endpointsForKey(InetAddress seed, ByteBuffer
key, String keyspace)
throws IOException
{
+ Configuration conf = new Configuration();
+
RingCache ring = new RingCache(keyspace, new RandomPartitioner(),
seed.getHostAddress(), 9160);
List<InetAddress> privateendpoints = ring.getEndpoint(key);
List<InetAddress> endpoints = new ArrayList<InetAddress>();
Modified:
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java
(original)
+++
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/cli/CliTest.java
Wed Oct 19 04:22:58 2011
@@ -158,6 +158,7 @@ public class CliTest extends CleanupHelp
"drop column family cF8;",
"create keyspace TESTIN;",
"drop keyspace tesTIN;",
+ "update column family 123 with comparator=UTF8Type and
column_metadata=[];",
"drop column family 123;",
"create column family myCF with column_type='Super' and
comparator='UTF8Type' AND subcomparator='UTF8Type';",
"assume myCF keys as utf8;",
Modified:
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=1185965&r1=1185964&r2=1185965&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java
(original)
+++
cassandra/branches/cassandra-1.0/test/unit/org/apache/cassandra/client/TestRingCache.java
Wed Oct 19 04:22:58 2011
@@ -23,16 +23,18 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -42,11 +44,12 @@ public class TestRingCache
{
private RingCache ringCache;
private Cassandra.Client thriftClient;
+ private Configuration conf;
public TestRingCache(String keyspace) throws IOException
{
- String seed =
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
- ringCache = new RingCache(keyspace,
DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort());
+ ConfigHelper.setOutputColumnFamily(conf, keyspace, "Standard1");
+ ringCache = new RingCache(conf);
}
private void setup(String server, int port) throws Exception
@@ -58,6 +61,12 @@ public class TestRingCache
Cassandra.Client cassandraClient = new
Cassandra.Client(binaryProtocol);
socket.open();
thriftClient = cassandraClient;
+ String seed =
DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
+ conf = new Configuration();
+ ConfigHelper.setPartitioner(conf,
DatabaseDescriptor.getPartitioner().getClass().getName());
+ ConfigHelper.setInitialAddress(conf, seed);
+ ConfigHelper.setRpcPort(conf,
Integer.toString(DatabaseDescriptor.getRpcPort()));
+
}
/**