Make prepared statement global instead of per-connection patch by slebresne; reviewed by jbellis for CASSANDRA-4449
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ccca5f1e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ccca5f1e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ccca5f1e Branch: refs/heads/trunk Commit: ccca5f1e39c220ddc7ce68883622667229e28113 Parents: f6bb970 Author: Sylvain Lebresne <[email protected]> Authored: Wed Sep 26 14:05:09 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Sep 26 14:05:09 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol.spec | 4 + .../org/apache/cassandra/cql3/QueryProcessor.java | 60 +++++++++--- .../apache/cassandra/exceptions/ExceptionCode.java | 3 +- .../exceptions/PreparedQueryNotFoundException.java | 40 ++++++++ .../org/apache/cassandra/service/ClientState.java | 12 --- .../apache/cassandra/thrift/CassandraServer.java | 9 +- .../org/apache/cassandra/transport/CBUtil.java | 20 ++++ .../org/apache/cassandra/transport/Client.java | 3 +- .../apache/cassandra/transport/SimpleClient.java | 2 +- .../cassandra/transport/messages/ErrorMessage.java | 11 ++ .../transport/messages/ExecuteMessage.java | 20 +++-- .../transport/messages/PrepareMessage.java | 2 +- .../transport/messages/ResultMessage.java | 27 ++++-- src/java/org/apache/cassandra/utils/MD5Digest.java | 75 +++++++++++++++ 15 files changed, 242 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7314f99..b70c412 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -3,6 +3,7 @@ * adjust blockFor calculation to account for pending ranges due to node movement (CASSANDRA-833) * Change CQL version to 3.0.0 and stop accepting 3.0.0-beta1 (CASSANDRA-4649) + * Make prepared statement global instead of per connection (CASSANDRA-4449) 1.2-beta1 * add atomic_batch_mutate (CASSANDRA-4542, -4635) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/doc/native_protocol.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec index 0dd5c14..9a44697 100644 --- a/doc/native_protocol.spec +++ b/doc/native_protocol.spec @@ -520,3 +520,7 @@ Table of Contents already exists. If the query was attempting to create a keyspace, <table> will be present but will be the empty string. + 0x2500 Unprepared: Can be thrown while a prepared statement tries to be + executed if the provide prepared statement ID is not known by + this host. The rest of the ERROR message body will be [bytes] + representing the unknown ID. http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 146f775..856f6fd 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -20,6 +20,7 @@ package org.apache.cassandra.cql3; import java.nio.ByteBuffer; import java.util.*; +import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import org.antlr.runtime.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,7 @@ import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.thrift.SchemaDisagreementException; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MD5Digest; import org.apache.cassandra.utils.SemanticVersion; public class QueryProcessor @@ -42,6 +44,26 @@ public class QueryProcessor private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class); + public static final int MAX_CACHE_PREPARED = 100000; // Enough to keep buggy clients from OOM'ing us + private static final Map<MD5Digest, CQLStatement> preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, CQLStatement>() + .maximumWeightedCapacity(MAX_CACHE_PREPARED) + .build(); + + private static final Map<Integer, CQLStatement> thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>() + .maximumWeightedCapacity(MAX_CACHE_PREPARED) + .build(); + + + public static CQLStatement getPrepared(MD5Digest id) + { + return preparedStatements.get(id); + } + + public static CQLStatement getPrepared(Integer id) + { + return thriftPreparedStatements.get(id); + } + public static void validateKey(ByteBuffer key) throws InvalidRequestException { if (key == null || key.remaining() == 0) @@ -151,20 +173,38 @@ public class QueryProcessor } } - public static ResultMessage.Prepared prepare(String queryString, ClientState clientState) + public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift) throws RequestValidationException { logger.trace("CQL QUERY: {}", queryString); ParsedStatement.Prepared prepared = getStatement(queryString, clientState); - int statementId = makeStatementId(queryString); - clientState.getCQL3Prepared().put(statementId, prepared.statement); - logger.trace(String.format("Stored prepared statement #%d with %d bind markers", - statementId, - prepared.statement.getBoundsTerms())); + ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState, prepared, forThrift); assert prepared.statement.getBoundsTerms() == prepared.boundNames.size(); - return new ResultMessage.Prepared(statementId, prepared.boundNames); + return msg; + } + + private static ResultMessage.Prepared storePreparedStatement(String queryString, ClientState clientState, ParsedStatement.Prepared prepared, boolean forThrift) + { + if (forThrift) + { + int statementId = queryString.hashCode(); + thriftPreparedStatements.put(statementId, prepared.statement); + logger.trace(String.format("Stored prepared statement #%d with %d bind markers", + statementId, + prepared.statement.getBoundsTerms())); + return ResultMessage.Prepared.forThrift(statementId, prepared.boundNames); + } + else + { + MD5Digest statementId = MD5Digest.compute(queryString); + logger.trace(String.format("Stored prepared statement %s with %d bind markers", + statementId, + prepared.statement.getBoundsTerms())); + preparedStatements.put(statementId, prepared.statement); + return new ResultMessage.Prepared(statementId, prepared.boundNames); + } } public static ResultMessage processPrepared(CQLStatement statement, ClientState clientState, List<ByteBuffer> variables) @@ -188,12 +228,6 @@ public class QueryProcessor return processStatement(statement, clientState, variables); } - private static final int makeStatementId(String cql) - { - // use the hash of the string till something better is provided - return cql.hashCode(); - } - private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) throws RequestValidationException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/exceptions/ExceptionCode.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java index 13fcc6a..e8dfb4e 100644 --- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java +++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java @@ -43,7 +43,8 @@ public enum ExceptionCode UNAUTHORIZED (0x2100), INVALID (0x2200), CONFIG_ERROR (0x2300), - ALREADY_EXISTS (0x2400); + ALREADY_EXISTS (0x2400), + UNPREPARED (0x2500); public final int value; private static final Map<Integer, ExceptionCode> valueToCode = new HashMap<Integer, ExceptionCode>(ExceptionCode.values().length); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java b/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java new file mode 100644 index 0000000..07502c8 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/PreparedQueryNotFoundException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.exceptions; + +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.utils.MD5Digest; + +public class PreparedQueryNotFoundException extends RequestValidationException +{ + public final MD5Digest id; + + public PreparedQueryNotFoundException(MD5Digest id) + { + super(ExceptionCode.UNPREPARED, makeMsg(id)); + this.id = id; + } + + private static String makeMsg(MD5Digest id) + { + return String.format("Prepared query with ID %d not found" + + " (either the query was not prepared on this host (maybe the host has been restarted?)" + + " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)", + id, QueryProcessor.MAX_CACHE_PREPARED, id); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index ef640af..6266a3b 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -65,12 +65,6 @@ public class ClientState } }; - private final Map<Integer, org.apache.cassandra.cql3.CQLStatement> cql3Prepared = new LinkedHashMap<Integer, org.apache.cassandra.cql3.CQLStatement>(16, 0.75f, true) { - protected boolean removeEldestEntry(Map.Entry<Integer, org.apache.cassandra.cql3.CQLStatement> eldest) { - return size() > MAX_CACHE_PREPARED; - } - }; - private long clock; // internalCall is used to mark ClientState as used by some internal component @@ -96,11 +90,6 @@ public class ClientState return prepared; } - public Map<Integer, org.apache.cassandra.cql3.CQLStatement> getCQL3Prepared() - { - return cql3Prepared; - } - public String getRawKeyspace() { return keyspace; @@ -191,7 +180,6 @@ public class ClientState preparedTracingSession = null; resourceClear(); prepared.clear(); - cql3Prepared.clear(); } public void hasKeyspaceAccess(String keyspace, Permission perm) throws UnauthorizedException, InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 73ecd29..4ab19bb 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -1705,7 +1705,7 @@ public class CassandraServer implements Cassandra.Iface if (cState.getCQLVersion().major == 2) return QueryProcessor.prepare(queryString, cState); else - return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState).toThriftPreparedResult(); + return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult(); } catch (RequestValidationException e) { @@ -1741,10 +1741,13 @@ public class CassandraServer implements Cassandra.Iface } else { - org.apache.cassandra.cql3.CQLStatement statement = cState.getCQL3Prepared().get(itemId); + org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId); if (statement == null) - throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId)); + throw new InvalidRequestException(String.format("Prepared query with ID %d not found" + + " (either the query was not prepared on this host (maybe the host has been restarted?)" + + " or you have prepared more than %d queries and queries %d has been evicted from the internal cache)", + itemId, org.apache.cassandra.cql3.QueryProcessor.MAX_CACHE_PREPARED, itemId)); logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index b977f35..fe8863a 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -111,6 +111,26 @@ public abstract class CBUtil return ChannelBuffers.wrappedBuffer(shortToCB(bytes.readableBytes()), bytes); } + public static ChannelBuffer bytesToCB(byte[] bytes) + { + return ChannelBuffers.wrappedBuffer(shortToCB(bytes.length), ChannelBuffers.wrappedBuffer(bytes)); + } + + public static byte[] readBytes(ChannelBuffer cb) + { + try + { + int length = cb.readUnsignedShort(); + byte[] bytes = new byte[length]; + cb.readBytes(bytes); + return bytes; + } + catch (IndexOutOfBoundsException e) + { + throw new ProtocolException("Not enough bytes to read a byte array preceded by it's 2 bytes length"); + } + } + public static ChannelBuffer longStringToCB(String str) { ChannelBuffer bytes = bytes(str); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index b9e00fa..3b4ace9 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -27,6 +27,7 @@ import com.google.common.base.Splitter; import org.apache.cassandra.transport.messages.*; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.utils.Hex; public class Client extends SimpleClient { @@ -109,7 +110,7 @@ public class Client extends SimpleClient { try { - int id = Integer.parseInt(iter.next()); + byte[] id = Hex.hexToBytes(iter.next()); List<ByteBuffer> values = new ArrayList<ByteBuffer>(); while(iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index ea0a3df..8132e65 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -126,7 +126,7 @@ public class SimpleClient return (ResultMessage.Prepared)msg; } - public ResultMessage executePrepared(int statementId, List<ByteBuffer> values) + public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values) { Message.Response msg = execute(new ExecuteMessage(statementId, values)); assert msg instanceof ResultMessage; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index ecb387b..8ed8e94 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -33,6 +33,7 @@ import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.transport.ServerError; import org.apache.cassandra.thrift.AuthenticationException; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.MD5Digest; /** * Message to indicate an error to the client. @@ -91,6 +92,12 @@ public class ErrorMessage extends Message.Response te = new ReadTimeoutException(cl, received, blockFor, dataPresent != 0); } break; + case UNPREPARED: + { + MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body)); + te = new PreparedQueryNotFoundException(id); + } + break; case SYNTAX_ERROR: te = new SyntaxException(msg); break; @@ -145,6 +152,10 @@ public class ErrorMessage extends Message.Response if (readEx != null) acb.writeByte((byte)(readEx.dataPresent ? 1 : 0)); break; + case UNPREPARED: + PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error; + acb = CBUtil.bytesToCB(pqnfe.id.bytes); + break; case ALREADY_EXISTS: AlreadyExistsException aee = (AlreadyExistsException)msg.error; acb = ChannelBuffers.wrappedBuffer(CBUtil.stringToCB(aee.ksName), http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 4172862..4400d12 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -25,8 +25,9 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.QueryProcessor; -import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; import org.apache.cassandra.transport.*; +import org.apache.cassandra.utils.MD5Digest; public class ExecuteMessage extends Message.Request { @@ -34,7 +35,7 @@ public class ExecuteMessage extends Message.Request { public ExecuteMessage decode(ChannelBuffer body) { - int id = body.readInt(); + byte[] id = CBUtil.readBytes(body); int count = body.readUnsignedShort(); List<ByteBuffer> values = new ArrayList<ByteBuffer>(count); @@ -53,7 +54,7 @@ public class ExecuteMessage extends Message.Request // - options int vs = msg.values.size(); CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, vs); - builder.add(CBUtil.intToCB(msg.statementId)); + builder.add(CBUtil.bytesToCB(msg.statementId.bytes)); builder.add(CBUtil.shortToCB(vs)); // Values @@ -64,10 +65,15 @@ public class ExecuteMessage extends Message.Request } }; - public final int statementId; + public final MD5Digest statementId; public final List<ByteBuffer> values; - public ExecuteMessage(int statementId, List<ByteBuffer> values) + public ExecuteMessage(byte[] statementId, List<ByteBuffer> values) + { + this(MD5Digest.wrap(statementId), values); + } + + public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values) { super(Message.Type.EXECUTE); this.statementId = statementId; @@ -84,10 +90,10 @@ public class ExecuteMessage extends Message.Request try { ServerConnection c = (ServerConnection)connection; - CQLStatement statement = c.clientState().getCQL3Prepared().get(statementId); + CQLStatement statement = QueryProcessor.getPrepared(statementId); if (statement == null) - throw new InvalidRequestException(String.format("Prepared query with ID %d not found", statementId)); + throw new PreparedQueryNotFoundException(statementId); return QueryProcessor.processPrepared(statement, c.clientState(), values); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index 5c2636a..382e834 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -55,7 +55,7 @@ public class PrepareMessage extends Message.Request { try { - return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState()); + return QueryProcessor.prepare(query, ((ServerConnection)connection).clientState(), false); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/transport/messages/ResultMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java index 6b63948..d5009e9 100644 --- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java @@ -29,6 +29,7 @@ import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.thrift.CqlPreparedResult; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.thrift.CqlResultType; +import org.apache.cassandra.utils.MD5Digest; public abstract class ResultMessage extends Message.Response { @@ -248,30 +249,40 @@ public abstract class ResultMessage extends Message.Response { public ResultMessage decode(ChannelBuffer body) { - int id = body.readInt(); - return new Prepared(id, ResultSet.Metadata.codec.decode(body)); + MD5Digest id = MD5Digest.wrap(CBUtil.readBytes(body)); + return new Prepared(id, -1, ResultSet.Metadata.codec.decode(body)); } public ChannelBuffer encode(ResultMessage msg) { assert msg instanceof Prepared; Prepared prepared = (Prepared)msg; - return ChannelBuffers.wrappedBuffer(CBUtil.intToCB(prepared.statementId), ResultSet.Metadata.codec.encode(prepared.metadata)); + assert prepared.statementId != null; + return ChannelBuffers.wrappedBuffer(CBUtil.bytesToCB(prepared.statementId.bytes), ResultSet.Metadata.codec.encode(prepared.metadata)); } }; - public final int statementId; + public final MD5Digest statementId; public final ResultSet.Metadata metadata; - public Prepared(int statementId, List<ColumnSpecification> names) + // statement id for CQL-over-thrift compatibility. The binary protocol ignore that. + private final int thriftStatementId; + + public Prepared(MD5Digest statementId, List<ColumnSpecification> names) + { + this(statementId, -1, new ResultSet.Metadata(names)); + } + + public static Prepared forThrift(int statementId, List<ColumnSpecification> names) { - this(statementId, new ResultSet.Metadata(names)); + return new Prepared(null, statementId, new ResultSet.Metadata(names)); } - private Prepared(int statementId, ResultSet.Metadata metadata) + private Prepared(MD5Digest statementId, int thriftStatementId, ResultSet.Metadata metadata) { super(Kind.PREPARED); this.statementId = statementId; + this.thriftStatementId = thriftStatementId; this.metadata = metadata; } @@ -294,7 +305,7 @@ public abstract class ResultMessage extends Message.Response namesString.add(name.toString()); typesString.add(TypeParser.getShortName(name.type)); } - return new CqlPreparedResult(statementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString); + return new CqlPreparedResult(thriftStatementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/ccca5f1e/src/java/org/apache/cassandra/utils/MD5Digest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MD5Digest.java b/src/java/org/apache/cassandra/utils/MD5Digest.java new file mode 100644 index 0000000..59c1aba --- /dev/null +++ b/src/java/org/apache/cassandra/utils/MD5Digest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.Arrays; + +/** + * The result of the computation of an MD5 digest. + * + * A MD5 is really just a byte[] but arrays are a no go as map keys. We could + * wrap it in a ByteBuffer but: + * 1. MD5Digest is a more explicit name than ByteBuffer to represent a md5. + * 2. Using our own class allows to use our FastByteComparison for equals. + */ +public class MD5Digest +{ + public final byte[] bytes; + + private MD5Digest(byte[] bytes) + { + this.bytes = bytes; + } + + public static MD5Digest wrap(byte[] digest) + { + return new MD5Digest(digest); + } + + public static MD5Digest compute(byte[] toHash) + { + return new MD5Digest(FBUtilities.threadLocalMD5Digest().digest(toHash)); + } + + public static MD5Digest compute(String toHash) + { + return compute(toHash.getBytes()); + } + + @Override + public final int hashCode() + { + return Arrays.hashCode(bytes); + } + + @Override + public final boolean equals(Object o) + { + if(!(o instanceof MD5Digest)) + return false; + MD5Digest that = (MD5Digest)o; + // handles nulls properly + return FBUtilities.compareUnsigned(this.bytes, that.bytes, 0, 0, this.bytes.length, that.bytes.length) == 0; + } + + @Override + public String toString() + { + return Hex.bytesToHex(bytes); + } +}
