Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=881998&view=auto ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (added) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Wed Nov 18 23:27:14 2009 @@ -0,0 +1,190 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.io.DataInputBuffer; +import org.apache.cassandra.io.DataOutputBuffer; +import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.service.ColumnParent; +import org.apache.cassandra.service.SlicePredicate; +import org.apache.cassandra.service.SliceRange; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.thrift.TDeserializer; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class RangeSliceCommand +{ + private static final SliceCommandSerializer serializer = new SliceCommandSerializer(); + + public final String keyspace; + + public final String column_family; + public final byte[] super_column; + + public final SlicePredicate predicate; + + public final String start_key; + public final String finish_key; + public final int max_keys; + + public RangeSliceCommand(String keyspace, ColumnParent column_parent, SlicePredicate predicate, String start_key, String finish_key, int max_keys) + { + this.keyspace = keyspace; + column_family = column_parent.getColumn_family(); + super_column = column_parent.getSuper_column(); + this.predicate = predicate; + this.start_key = start_key; + this.finish_key = finish_key; + this.max_keys = max_keys; + } + + public RangeSliceCommand(RangeSliceCommand cmd, int max_keys) + { + this(cmd.keyspace, + new ColumnParent(cmd.column_family, cmd.super_column), + new SlicePredicate(cmd.predicate), + cmd.start_key, + cmd.finish_key, + max_keys); + + } + + public Message getMessage() throws IOException + { + DataOutputBuffer dob = new DataOutputBuffer(); + serializer.serialize(this, dob); + return new Message(FBUtilities.getLocalAddress(), + StageManager.readStage_, + StorageService.rangeSliceVerbHandler_, + Arrays.copyOf(dob.getData(), dob.getLength())); + } + + public static RangeSliceCommand read(Message message) throws IOException + { + byte[] bytes = message.getMessageBody(); + DataInputBuffer dib = new DataInputBuffer(); + dib.reset(bytes, bytes.length); + return serializer.deserialize(new DataInputStream(dib)); + } +} + +class SliceCommandSerializer implements ICompactSerializer<RangeSliceCommand> +{ + public void serialize(RangeSliceCommand sliceCommand, DataOutputStream dos) throws IOException + { + dos.writeUTF(sliceCommand.keyspace); + dos.writeUTF(sliceCommand.column_family); + dos.writeInt(sliceCommand.super_column == null ? 0 : sliceCommand.super_column.length); + if (sliceCommand.super_column != null) + dos.write(sliceCommand.super_column); + + TSerializer ser = new TSerializer(new TBinaryProtocol.Factory()); + try + { + byte[] serPred = ser.serialize(sliceCommand.predicate); + dos.writeInt(serPred.length); + dos.write(serPred); + } + catch (TException ex) + { + throw new IOException(ex); + } + + dos.writeUTF(sliceCommand.start_key); + dos.writeUTF(sliceCommand.finish_key); + dos.writeInt(sliceCommand.max_keys); + } + + public RangeSliceCommand deserialize(DataInputStream dis) throws IOException + { + String keyspace = dis.readUTF(); + String column_family = dis.readUTF(); + + int scLength = dis.readInt(); + byte[] super_column = null; + if (scLength > 0) + super_column = readBuf(scLength, dis); + + byte[] predBytes = new byte[dis.readInt()]; + dis.readFully(predBytes); + TDeserializer dser = new TDeserializer(new TBinaryProtocol.Factory()); + SlicePredicate pred = new SlicePredicate(); + try + { + dser.deserialize(pred, predBytes); + } + catch (TException ex) + { + throw new IOException(ex); + } + + String start_key = dis.readUTF(); + String finish_key = dis.readUTF(); + int max_keys = dis.readInt(); + return new RangeSliceCommand(keyspace, + new ColumnParent(column_family, super_column), + pred, + start_key, + finish_key, + max_keys); + + } + + static byte[] readBuf(int len, DataInputStream dis) throws IOException + { + byte[] buf = new byte[len]; + int read = 0; + while (read < len) + read = dis.read(buf, read, len - read); + return buf; + } +}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java?rev=881998&view=auto ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java (added) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceReply.java Wed Nov 18 23:27:14 2009 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.DataInputBuffer; +import org.apache.cassandra.io.DataOutputBuffer; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.util.*; + +public class RangeSliceReply +{ + public final List<Row> rows; + public final boolean rangeCompletedLocally; + + public RangeSliceReply(List<Row> rows, boolean rangeCompletedLocally) + { + this.rows = rows; + this.rangeCompletedLocally = rangeCompletedLocally; + } + + public Message getReply(Message originalMessage) throws IOException + { + DataOutputBuffer dob = new DataOutputBuffer(); + dob.writeBoolean(rangeCompletedLocally); + dob.writeInt(rows.size()); + for (Row row : rows) + { + Row.serializer().serialize(row, dob); + } + byte[] data = Arrays.copyOf(dob.getData(), dob.getLength()); + return originalMessage.getReply(FBUtilities.getLocalAddress(), data); + } + + @Override + public String toString() + { + return "RangeSliceReply{" + + "rows=" + StringUtils.join(rows, ",") + + ", rangeCompletedLocally=" + rangeCompletedLocally + + '}'; + } + + public static RangeSliceReply read(byte[] body) throws IOException + { + DataInputBuffer bufIn = new DataInputBuffer(); + bufIn.reset(body, body.length); + boolean completed = bufIn.readBoolean(); + int rowCount = bufIn.readInt(); + List<Row> rows = new ArrayList<Row>(rowCount); + for (int i = 0; i < rowCount; i++) + { + rows.add(Row.serializer().deserialize(bufIn)); + } + return new RangeSliceReply(rows, completed); + } +} Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=881998&r1=881997&r2=881998&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Wed Nov 18 23:27:14 2009 @@ -118,6 +118,7 @@ header_.setMessageId(id); } + // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len public Message getReply(InetAddress from, byte[] args) { Header header = new Header(getMessageId(), from, MessagingService.responseStage_, MessagingService.responseVerbHandler_); Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=881998&r1=881997&r2=881998&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Wed Nov 18 23:27:14 2009 @@ -33,10 +33,8 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.db.filter.QueryPath; -import java.net.InetAddress; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.LogUtil; -import org.apache.cassandra.dht.Token; import org.apache.thrift.TException; import flexjson.JSONSerializer; @@ -557,6 +555,60 @@ return columnFamiliesMap; } + public List<KeySlice> get_range_slice(String keyspace, ColumnParent column_parent, SlicePredicate predicate, String start_key, String finish_key, int maxRows, int consistency_level) + throws InvalidRequestException, UnavailableException, TException + { + if (logger.isDebugEnabled()) + logger.debug("range_slice"); + if (predicate.getSlice_range() != null) + ThriftValidation.validateRange(keyspace, column_parent, predicate.getSlice_range()); + else + ThriftValidation.validateColumns(keyspace, column_parent, predicate.getColumn_names()); + if (!StorageService.getPartitioner().preservesOrder()) + { + throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner"); + } + if (maxRows <= 0) + { + throw new InvalidRequestException("maxRows must be positive"); + } + + Map<String, Collection<IColumn>> colMap; // keys are sorted. + try + { + colMap = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, predicate, start_key, finish_key, maxRows)); + if (colMap == null) + throw new RuntimeException("KeySlice list should never be null."); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + List<KeySlice> keySlices = new ArrayList<KeySlice>(colMap.size()); + for (String key : colMap.keySet()) + { + Collection<IColumn> dbList = colMap.get(key); + List<ColumnOrSuperColumn> svcList = new ArrayList<ColumnOrSuperColumn>(dbList.size()); + for (org.apache.cassandra.db.IColumn col : dbList) + { + if (col instanceof org.apache.cassandra.db.Column) + svcList.add(new ColumnOrSuperColumn(new org.apache.cassandra.service.Column(col.name(), col.value(), col.timestamp()), null)); + else if (col instanceof org.apache.cassandra.db.SuperColumn) + { + Collection<IColumn> subICols = col.getSubColumns(); + List<org.apache.cassandra.service.Column> subCols = new ArrayList<org.apache.cassandra.service.Column>(subICols.size()); + for (IColumn subCol : subICols) + subCols.add(new org.apache.cassandra.service.Column(subCol.name(), subCol.value(), subCol.timestamp())); + svcList.add(new ColumnOrSuperColumn(null, new org.apache.cassandra.service.SuperColumn(col.name(), subCols))); + } + } + keySlices.add(new KeySlice(key, svcList)); + } + + return keySlices; + } + public List<String> get_key_range(String tablename, String columnFamily, String startWith, String stopAt, int maxResults, int consistency_level) throws InvalidRequestException, TException, UnavailableException { Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=881998&view=auto ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (added) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Wed Nov 18 23:27:14 2009 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import org.apache.cassandra.db.RangeSliceCommand; +import org.apache.cassandra.db.RangeSliceReply; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.log4j.Logger; + +public class RangeSliceVerbHandler implements IVerbHandler +{ + + private static final Logger logger = Logger.getLogger(RangeSliceVerbHandler.class); + + public void doVerb(Message message) + { + try + { + RangeSliceCommand command = RangeSliceCommand.read(message); + RangeSliceReply reply = Table.open(command.keyspace).getColumnFamilyStore(command.column_family).getRangeSlice( + command.super_column, + command.start_key, + command.finish_key, + command.max_keys, + command.predicate.slice_range, + command.predicate.column_names); + Message response = reply.getReply(message); + if (logger.isDebugEnabled()) + logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom()); + MessagingService.instance().sendOneWay(response, message.getFrom()); + } + catch (Exception ex) + { + throw new RuntimeException(ex); + } + } +} Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=881998&r1=881997&r2=881998&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Nov 18 23:27:14 2009 @@ -56,6 +56,7 @@ private static TimedStatsDeque readStats = new TimedStatsDeque(60000); private static TimedStatsDeque rangeStats = new TimedStatsDeque(60000); private static TimedStatsDeque writeStats = new TimedStatsDeque(60000); + private StorageProxy() {} static { @@ -70,6 +71,23 @@ } } + private static Comparator<String> keyComparator = new Comparator<String>() + { + public int compare(String o1, String o2) + { + IPartitioner p = StorageService.getPartitioner(); + return p.getDecoratedKeyComparator().compare(p.decorateKey(o1), p.decorateKey(o2)); + } + }; + + private static Comparator<Row> rowComparator = new Comparator<Row>() + { + public int compare(Row r1, Row r2) + { + return keyComparator.compare(r1.key(), r2.key()); + } + }; + /** * Use this method to have this RowMutation applied * across all replicas. This method will take care @@ -508,6 +526,95 @@ return rows; } + static Map<String, Collection<IColumn>> getRangeSlice(RangeSliceCommand rawCommand) throws IOException, UnavailableException + { + long startTime = System.currentTimeMillis(); + TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata(); + RangeSliceCommand command = rawCommand; + + InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.start_key); + InetAddress startEndpoint = endPoint; + InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint(); + + TreeSet<Row> allRows = new TreeSet<Row>(rowComparator); + do + { + + Message message = command.getMessage(); + if (logger.isDebugEnabled()) + logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endPoint); + IAsyncResult iar = MessagingService.instance().sendRR(message, endPoint); + byte[] responseBody; + try + { + responseBody = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException ex) + { + throw new RuntimeException(ex); + } + RangeSliceReply reply = RangeSliceReply.read(responseBody); + List<Row> rangeRows = new ArrayList<Row>(reply.rows); + + // combine these what what has been seen so far. + if (rangeRows.size() > 0) + { + if (allRows.size() > 0) + { + if (keyComparator.compare(rangeRows.get(rangeRows.size() - 1).key(), allRows.first().key()) <= 0) + { + // unlikely, but possible + if (rangeRows.get(rangeRows.size() - 1).equals(allRows.first().key())) + { + rangeRows.remove(rangeRows.size() - 1); + } + // put all from rangeRows into allRows. + allRows.addAll(rangeRows); + } + else if (keyComparator.compare(allRows.last().key(), rangeRows.get(0).key()) <= 0) + { + // common case. deal with simple start/end key overlaps + if (allRows.last().key().equals(rangeRows.get(0))) + { + allRows.remove(allRows.last().key()); + } + allRows.addAll(rangeRows); // todo: check logic. + } + else + { + // deal with potential large overlap from scanning the first endpoint, which contains + // both the smallest and largest keys + allRows.addAll(rangeRows); // todo: check logic. + } + } + else + allRows.addAll(rangeRows); // todo: check logic. + } + + if (allRows.size() >= rawCommand.max_keys || reply.rangeCompletedLocally) + break; + + do + { + endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move this into the Strategies & modify for RackAwareStrategy + } + while (!FailureDetector.instance().isAlive(endPoint)); + int maxResults = endPoint == wrapEndpoint ? rawCommand.max_keys : rawCommand.max_keys - allRows.size(); + command = new RangeSliceCommand(command, maxResults); + } + while (!endPoint.equals(startEndpoint)); + + Map<String, Collection<IColumn>> results = new TreeMap<String, Collection<IColumn>>(); + for (Row row : allRows) + { + // for now, assume only one cf per row, since that is all we can specify in the Command. + ColumnFamily cf = row.getColumnFamilies().iterator().next(); + results.put(row.key(),cf.getSortedColumns()); + } + rangeStats.add(System.currentTimeMillis() - startTime); + return results; + } + static List<String> getKeyRange(RangeCommand rawCommand) throws IOException, UnavailableException { long startTime = System.currentTimeMillis(); @@ -544,16 +651,7 @@ { if (allKeys.size() > 0) { - Comparator<String> comparator = new Comparator<String>() - { - public int compare(String o1, String o2) - { - IPartitioner p = StorageService.getPartitioner(); - return p.getDecoratedKeyComparator().compare(p.decorateKey(o1), p.decorateKey(o2)); - } - }; - - if (comparator.compare(rangeKeys.get(rangeKeys.size() - 1), allKeys.get(0)) <= 0) + if (keyComparator.compare(rangeKeys.get(rangeKeys.size() - 1), allKeys.get(0)) <= 0) { // unlikely, but possible if (rangeKeys.get(rangeKeys.size() - 1).equals(allKeys.get(0))) @@ -563,7 +661,7 @@ rangeKeys.addAll(allKeys); allKeys = rangeKeys; } - else if (comparator.compare(allKeys.get(allKeys.size() - 1), rangeKeys.get(0)) <= 0) + else if (keyComparator.compare(allKeys.get(allKeys.size() - 1), rangeKeys.get(0)) <= 0) { // common case. deal with simple start/end key overlaps if (allKeys.get(allKeys.size() - 1).equals(rangeKeys.get(0))) Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=881998&r1=881997&r2=881998&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Nov 18 23:27:14 2009 @@ -76,6 +76,7 @@ public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER"; public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER"; public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER"; + public final static String rangeSliceVerbHandler_ = "RANGE-SLICE-VERB-HANDLER"; public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER"; private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner(); @@ -223,6 +224,7 @@ MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler()); MessagingService.instance().registerVerbHandlers(dataFileVerbHandler_, new DataFileVerbHandler() ); MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler()); + MessagingService.instance().registerVerbHandlers(rangeSliceVerbHandler_, new RangeSliceVerbHandler()); // see BootStrapper for a summary of how the bootstrap verbs interact MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler()); MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() ); @@ -275,7 +277,7 @@ while (isBootstrapMode) { try - { + { Thread.sleep(100); } catch (InterruptedException e) Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java?rev=881998&r1=881997&r2=881998&view=diff ============================================================================== --- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java (original) +++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java Wed Nov 18 23:27:14 2009 @@ -155,6 +155,8 @@ throw new InvalidRequestException("supercolumn name length must not be greater than " + IColumn.MAX_NAME_LENGTH); if (superColumnName.length == 0) throw new InvalidRequestException("supercolumn name must not be empty"); + if (!DatabaseDescriptor.getColumnFamilyType(keyspace, columnFamilyName).equals("Super")) + throw new InvalidRequestException("supercolumn specified to ColumnFamily " + columnFamilyName + " containing normal columns"); } AbstractType comparator = ColumnFamily.getComparatorFor(keyspace, columnFamilyName, superColumnName); for (byte[] name : column_names) Modified: incubator/cassandra/trunk/test/system/test_server.py URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=881998&r1=881997&r2=881998&view=diff ============================================================================== --- incubator/cassandra/trunk/test/system/test_server.py (original) +++ incubator/cassandra/trunk/test/system/test_server.py Wed Nov 18 23:27:14 2009 @@ -512,13 +512,68 @@ assert L == ['1', '10', '11', '12', '13', '14', '15', '16', '17', '18'], L def test_get_slice_range(self): - _insert_range() - _verify_range() + _insert_range() + _verify_range() def test_get_slice_super_range(self): - _insert_super_range() - _verify_super_range() + _insert_super_range() + _verify_super_range() + + def test_get_range_slice_super(self): + for key in ['key1', 'key2', 'key3', 'key4', 'key5']: + for cname in ['col1', 'col2', 'col3', 'col4', 'col5']: + client.insert('Keyspace2', key, ColumnPath('Super3', 'sc1', cname), 'v-' + cname, 0, ConsistencyLevel.ONE) + + cp = ColumnParent('Super3', 'sc1') + result = client.get_range_slice("Keyspace2", cp, SlicePredicate(column_names=['col1', 'col3']), 'key2', 'key4', 5, ConsistencyLevel.ONE) + assert len(result) == 3 + sc = result[0].columns[0].super_column + assert sc.columns[0].name == 'col1' + assert sc.columns[1].name == 'col3' + + cp = ColumnParent('Super3') + result = client.get_range_slice("Keyspace2", cp, SlicePredicate(column_names=['sc1']), 'key2', 'key4', 5, ConsistencyLevel.ONE) + assert len(result) == 3 + assert list(set(row.columns[0].super_column.name for row in result))[0] == 'sc1' + def test_get_range_slice(self): + for key in ['key1', 'key2', 'key3', 'key4', 'key5']: + for cname in ['col1', 'col2', 'col3', 'col4', 'col5']: + client.insert('Keyspace1', key, ColumnPath('Standard1', column=cname), 'v-' + cname, 0, ConsistencyLevel.ONE) + cp = ColumnParent('Standard1') + + # test column_names predicate + result = client.get_range_slice("Keyspace1", cp, SlicePredicate(column_names=['col1', 'col3']), 'key2', 'key4', 5, ConsistencyLevel.ONE) + assert len(result) == 3 + assert result[0].columns[0].column.name == 'col1' + assert result[0].columns[1].column.name == 'col3' + + # row limiting via count. + result = client.get_range_slice("Keyspace1", cp, SlicePredicate(column_names=['col1', 'col3']), 'key2', 'key4', 1, ConsistencyLevel.ONE) + assert len(result) == 1 + + # test column slice predicate + result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col2', finish='col4', reversed=False, count=5)), 'key1', 'key2', 5, ConsistencyLevel.ONE) + assert len(result) == 2 + assert result[0].key == 'key1' + assert result[1].key == 'key2' + assert len(result[0].columns) == 3 + assert result[0].columns[0].column.name == 'col2' + assert result[0].columns[2].column.name == 'col4' + + # col limiting via count + result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col2', finish='col4', reversed=False, count=2)), 'key1', 'key2', 5, ConsistencyLevel.ONE) + assert len(result[0].columns) == 2 + + # and reversed + result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col4', finish='col2', reversed=True, count=5)), 'key1', 'key2', 5, ConsistencyLevel.ONE) + assert result[0].columns[0].column.name == 'col2' + assert result[0].columns[2].column.name == 'col4' + + # row limiting via count + result = client.get_range_slice('Keyspace1', cp, SlicePredicate(slice_range=SliceRange(start='col2', finish='col4', reversed=False, count=5)), 'key1', 'key2', 1, ConsistencyLevel.ONE) + assert len(result) == 1 + def test_get_slice_by_names(self): _insert_range() p = SlicePredicate(column_names=['c1', 'c2']) @@ -536,17 +591,17 @@ def test_multiget(self): """Insert multiple keys and retrieve them using the multiget interface""" - """Generate a list of 10 keys and insert them""" + # Generate a list of 10 keys and insert them num_keys = 10 keys = ['key'+str(i) for i in range(1, num_keys+1)] _insert_multi(keys) - """Retrieve all 10 keys""" + # Retrieve all 10 keys rows = client.multiget('Keyspace1', keys, ColumnPath('Standard1', column='c1'), ConsistencyLevel.ONE) keys1 = rows.keys().sort() keys2 = keys.sort() - """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted""" + # Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted for key in keys: assert rows.has_key(key) == True assert rows[key] == ColumnOrSuperColumn(column=Column(timestamp=0, name='c1', value='value1')) @@ -554,18 +609,18 @@ def test_multiget_slice(self): """Insert multiple keys and retrieve them using the multiget_slice interface""" - """Generate a list of 10 keys and insert them""" + # Generate a list of 10 keys and insert them num_keys = 10 keys = ['key'+str(i) for i in range(1, num_keys+1)] _insert_multi(keys) - """Retrieve all 10 key slices""" + # Retrieve all 10 key slices rows = _big_multislice('Keyspace1', keys, ColumnParent('Standard1')) keys1 = rows.keys().sort() keys2 = keys.sort() columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS] - """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted""" + # Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted for key in keys: assert rows.has_key(key) == True assert columns == rows[key]
