Author: eevans
Date: Fri Dec 31 17:24:45 2010
New Revision: 1054140
URL: http://svn.apache.org/viewvc?rev=1054140&view=rev
Log:
port CQL server code, avro -> thrift
Patch by eevans; reviewed by jbellis for CASSANDRA-1913
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=1054140&r1=1054139&r2=1054140&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri
Dec 31 17:24:45 2010
@@ -1150,54 +1150,4 @@ public class CassandraServer implements
}
return null;
}
-
- @Override
- public CqlResult execute_cql_query(ByteBuffer query, Compression
compression)
- throws UnavailableException, InvalidRequestException, TimedOutException
- {
- String queryString = null;
-
- // Decompress the query string.
- try
- {
- switch (compression)
- {
- case GZIP:
- Inflater decompressor = new Inflater();
- decompressor.setInput(query.array(), 0,
query.array().length);
-
- ByteArrayOutputStream byteArray = new
ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
-
- while (!decompressor.finished())
- {
- int size = decompressor.inflate(buffer);
- byteArray.write(buffer, 0, size);
- }
-
- decompressor.end();
-
- queryString = new String(byteArray.toByteArray(), 0,
byteArray.size(), "UTF-8");
- }
- }
- catch (DataFormatException e)
- {
- throw newInvalidRequestException("Error deflating query string.");
- }
- catch (UnsupportedEncodingException e)
- {
- throw newInvalidRequestException("Unknown query string encoding.");
- }
-
- try
- {
- return QueryProcessor.process(queryString, state());
- }
- catch (RecognitionException e)
- {
- InvalidRequestException badQuery =
newInvalidRequestException("Invalid or malformed CQL query string");
- badQuery.initCause(e);
- throw badQuery;
- }
- }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1054140&r1=1054139&r2=1054140&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Fri Dec 31 17:24:45
2010
@@ -10,7 +10,7 @@ options {
import java.util.HashMap;
import java.util.Collections;
import org.apache.cassandra.thrift.ConsistencyLevel;
- import org.apache.cassandra.avro.InvalidRequestException;
+ import org.apache.cassandra.thrift.InvalidRequestException;
}
@members {
@@ -31,11 +31,7 @@ options {
public void throwLastRecognitionError() throws InvalidRequestException
{
if (recognitionErrors.size() > 0)
- {
- InvalidRequestException invalidExcep = new
InvalidRequestException();
- invalidExcep.why =
recognitionErrors.get((recognitionErrors.size()-1));
- throw invalidExcep;
- }
+ throw new
InvalidRequestException(recognitionErrors.get((recognitionErrors.size()-1)));
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1054140&r1=1054139&r2=1054140&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Fri
Dec 31 17:24:45 2010
@@ -30,11 +30,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.antlr.runtime.*;
-import org.apache.cassandra.avro.Column;
-import org.apache.cassandra.avro.*;
-import org.apache.cassandra.avro.InvalidRequestException;
-import org.apache.cassandra.avro.TimedOutException;
-import org.apache.cassandra.avro.UnavailableException;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.AbstractBounds;
@@ -43,17 +38,11 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.IndexClause;
-import org.apache.cassandra.thrift.IndexExpression;
-import org.apache.cassandra.thrift.IndexOperator;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.*;
-import static
org.apache.cassandra.avro.AvroErrorFactory.newInvalidRequestException;
-import static
org.apache.cassandra.avro.AvroErrorFactory.newUnavailableException;
-import static org.apache.cassandra.avro.AvroValidation.validateColumnFamily;
-import static org.apache.cassandra.avro.AvroValidation.validateKey;
+import static org.apache.cassandra.thrift.ThriftValidation.validateKey;
+import static
org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily;
public class QueryProcessor
{
@@ -104,16 +93,6 @@ public class QueryProcessor
{
throw new RuntimeException(e);
}
- catch (org.apache.cassandra.thrift.UnavailableException e)
- {
- UnavailableException error = new UnavailableException();
- error.initCause(e);
- throw error;
- }
- catch (org.apache.cassandra.thrift.InvalidRequestException e)
- {
- throw newInvalidRequestException(e);
- }
return rows;
}
@@ -183,14 +162,6 @@ public class QueryProcessor
thriftSlicePredicate,
select.getConsistencyLevel());
}
- catch (org.apache.cassandra.thrift.UnavailableException ex)
- {
- UnavailableException avroEx = new UnavailableException();
- avroEx.why = ex.getMessage();
- if (avroEx.why == null || avroEx.why.length() == 0)
- avroEx.why = "StorageProxy.scan() failed because of
insufficent responses.";
- throw avroEx;
- }
catch (IOException e)
{
throw new RuntimeException(e);
@@ -267,23 +238,23 @@ public class QueryProcessor
private static void validateSelect(String keyspace, SelectStatement
select) throws InvalidRequestException
{
if (select.isCountOperation() && (select.isKeyRange() ||
select.getKeys().size() < 1))
- throw newInvalidRequestException("Counts can only be performed for
a single record (Hint: KEY=term)");
+ throw new InvalidRequestException("Counts can only be performed
for a single record (Hint: KEY=term)");
// Finish key w/o start key (KEY < foo)
if (!select.isKeyRange() && (select.getKeyFinish() != null))
- throw newInvalidRequestException("Key range clauses must include a
start key (i.e. KEY > term)");
+ throw new InvalidRequestException("Key range clauses must include
a start key (i.e. KEY > term)");
// Key range and by-key(s) combined (KEY > foo AND KEY = bar)
if (select.isKeyRange() && select.getKeys().size() > 0)
- throw newInvalidRequestException("You cannot combine key range and
by-key clauses in a SELECT");
+ throw new InvalidRequestException("You cannot combine key range
and by-key clauses in a SELECT");
// Start and finish keys, *and* column relations (KEY > foo AND KEY <
bar and name1 = value1).
if (select.isKeyRange() && (select.getKeyFinish() != null) &&
(select.getColumnRelations().size() > 0))
- throw newInvalidRequestException("You cannot combine key range and
by-column clauses in a SELECT");
+ throw new InvalidRequestException("You cannot combine key range
and by-column clauses in a SELECT");
// Multiget scenario (KEY = foo AND KEY = bar ...)
if (select.getKeys().size() > 1)
- throw newInvalidRequestException("SELECTs can contain only by
by-key clause");
+ throw new InvalidRequestException("SELECTs can contain only by
by-key clause");
if (select.getColumnRelations().size() > 0)
{
@@ -293,7 +264,7 @@ public class QueryProcessor
if ((relation.operator().equals(RelationType.EQ)) &&
indexed.contains(relation.getEntity().getByteBuffer()))
return;
}
- throw newInvalidRequestException("No indexed columns present in
by-columns clause with \"equals\" operator");
+ throw new InvalidRequestException("No indexed columns present in
by-columns clause with \"equals\" operator");
}
}
@@ -328,9 +299,9 @@ public class QueryProcessor
{
avroResult.type = CqlResultType.INT;
if (rows.size() > 0)
- avroResult.num = rows.get(0).cf != null ?
rows.get(0).cf.getSortedColumns().size() : 0;
+ avroResult.setNum(rows.get(0).cf != null ?
rows.get(0).cf.getSortedColumns().size() : 0);
else
- avroResult.num = 0;
+ avroResult.setNum(0);
return avroResult;
}
}
@@ -394,7 +365,7 @@ public class QueryProcessor
for (UpdateStatement up : batch.getUpdates())
if (up.isSetConsistencyLevel())
- throw newInvalidRequestException(
+ throw new InvalidRequestException(
"Consistency level must be set on the BATCH,
not individual UPDATE statements");
batchUpdate(keyspace, batch.getUpdates(),
batch.getConsistencyLevel());
@@ -414,17 +385,13 @@ public class QueryProcessor
{
StorageProxy.truncateBlocking(keyspace, columnFamily);
}
- catch (org.apache.cassandra.thrift.UnavailableException e)
- {
- throw newUnavailableException(e);
- }
catch (TimeoutException e)
{
- throw newUnavailableException(e);
+ throw (UnavailableException) new
UnavailableException().initCause(e);
}
catch (IOException e)
{
- throw newUnavailableException(e);
+ throw (UnavailableException) new
UnavailableException().initCause(e);
}
avroResult.type = CqlResultType.VOID;
@@ -452,10 +419,6 @@ public class QueryProcessor
{
StorageProxy.mutate(rowMutations,
delete.getConsistencyLevel());
}
- catch (org.apache.cassandra.thrift.UnavailableException e)
- {
- throw newUnavailableException(e);
- }
catch (TimeoutException e)
{
throw new TimedOutException();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1054140&r1=1054139&r2=1054140&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Fri Dec 31 17:24:45 2010
@@ -18,7 +18,10 @@
package org.apache.cassandra.thrift;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.Map.Entry;
@@ -26,14 +29,18 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.antlr.runtime.RecognitionException;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql.QueryProcessor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
@@ -1127,5 +1134,64 @@ public class CassandraServer implements
internal_remove(key, path, System.currentTimeMillis(),
consistency_level);
}
+ @Override
+ public CqlResult execute_cql_query(ByteBuffer query, Compression
compression)
+ throws InvalidRequestException, UnavailableException,
TimedOutException, TException
+ {
+ String queryString = null;
+
+ // Decompress the query string.
+ try
+ {
+ switch (compression)
+ {
+ case GZIP:
+ ByteArrayOutputStream byteArray = new
ByteArrayOutputStream();
+ byte[] outBuffer = new byte[1024], inBuffer = new
byte[1024];
+
+ Inflater decompressor = new Inflater();
+
+ int lenRead = 0;
+ while (true)
+ {
+ if (decompressor.needsInput())
+ lenRead = query.remaining() < 1024 ?
query.remaining() : 1024;
+ query.get(inBuffer, 0, lenRead);
+ decompressor.setInput(inBuffer, 0, lenRead);
+
+ int lenWrite = 0;
+ while ((lenWrite = decompressor.inflate(outBuffer))
!=0)
+ byteArray.write(outBuffer, 0, lenWrite);
+
+ if (decompressor.finished())
+ break;
+ }
+
+ decompressor.end();
+
+ queryString = new String(byteArray.toByteArray(), 0,
byteArray.size(), "UTF-8");
+ }
+ }
+ catch (DataFormatException e)
+ {
+ throw new InvalidRequestException("Error deflating query string.");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new InvalidRequestException("Unknown query string
encoding.");
+ }
+
+ try
+ {
+ return QueryProcessor.process(queryString, state());
+ }
+ catch (RecognitionException e)
+ {
+ InvalidRequestException ire = new InvalidRequestException("Invalid
or malformed CQL query string");
+ ire.initCause(e);
+ throw ire;
+ }
+ }
+
// main method moved to CassandraDaemon
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1054140&r1=1054139&r2=1054140&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java
Fri Dec 31 17:24:45 2010
@@ -44,7 +44,7 @@ public class ThriftValidation
{
private static final Logger logger =
LoggerFactory.getLogger(ThriftValidation.class);
- static void validateKey(ByteBuffer key) throws InvalidRequestException
+ public static void validateKey(ByteBuffer key) throws
InvalidRequestException
{
if (key == null || key.remaining() == 0)
{