http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/tools/SSTableImport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java deleted file mode 100644 index b2d63aa..0000000 --- a/src/java/org/apache/cassandra/tools/SSTableImport.java +++ /dev/null @@ -1,568 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.tools; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; - -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.format.SSTableWriter; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; -import org.codehaus.jackson.map.MappingJsonFactory; -import org.codehaus.jackson.type.TypeReference; - -/** - * Create SSTables from JSON input - */ -public class SSTableImport -{ - private static final String KEYSPACE_OPTION = "K"; - private static final String COLUMN_FAMILY_OPTION = "c"; - private static final String KEY_COUNT_OPTION = "n"; - private static final String IS_SORTED_OPTION = "s"; - - private static final Options options = new Options(); - private static CommandLine cmd; - - private Integer keyCountToImport; - private final boolean isSorted; - - private static final JsonFactory factory = new MappingJsonFactory().configure( - JsonParser.Feature.INTERN_FIELD_NAMES, false); - - static - { - Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name."); - optKeyspace.setRequired(true); - options.addOption(optKeyspace); - - Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Table name."); - optColfamily.setRequired(true); - options.addOption(optColfamily); - - options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional).")); - options.addOption(new Option(IS_SORTED_OPTION, false, "Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional).")); - } - - private static class JsonColumn<T> - { - private ByteBuffer name; - private ByteBuffer value; - private long timestamp; - - private String kind; - // Expiring columns - private int ttl; - private int localExpirationTime; - - // Counter columns - private long timestampOfLastDelete; - - public JsonColumn(T json, CFMetaData meta) - { - if (json instanceof List) - { - CellNameType comparator = meta.comparator; - List fields = (List<?>) json; - - assert fields.size() >= 3 : "Cell definition should have at least 3"; - - name = stringAsType((String) fields.get(0), comparator.asAbstractType()); - timestamp = (Long) fields.get(2); - kind = ""; - - if (fields.size() > 3) - { - kind = (String) fields.get(3); - if (isExpiring()) - { - ttl = (Integer) fields.get(4); - localExpirationTime = (Integer) fields.get(5); - } - else if (isCounter()) - { - timestampOfLastDelete = ((Integer) fields.get(4)); - } - else if (isRangeTombstone()) - { - localExpirationTime = (Integer) fields.get(4); - } - } - - if (isDeleted()) - { - value = ByteBufferUtil.bytes((Integer) fields.get(1)); - } - else if (isRangeTombstone()) - { - value = stringAsType((String) fields.get(1), comparator.asAbstractType()); - } - else - { - assert meta.isCQL3Table() || name.hasRemaining() : "Cell name should not be empty"; - value = stringAsType((String) fields.get(1), - meta.getValueValidator(name.hasRemaining() - ? comparator.cellFromByteBuffer(name) - : meta.comparator.rowMarker(Composites.EMPTY))); - } - } - } - - public boolean isDeleted() - { - return kind.equals("d"); - } - - public boolean isExpiring() - { - return kind.equals("e"); - } - - public boolean isCounter() - { - return kind.equals("c"); - } - - public boolean isRangeTombstone() - { - return kind.equals("t"); - } - - public ByteBuffer getName() - { - return name.duplicate(); - } - - public ByteBuffer getValue() - { - return value.duplicate(); - } - } - - public SSTableImport() - { - this(null, false); - } - - public SSTableImport(boolean isSorted) - { - this(null, isSorted); - } - - public SSTableImport(Integer keyCountToImport, boolean isSorted) - { - this.keyCountToImport = keyCountToImport; - this.isSorted = isSorted; - } - - /** - * Add columns to a column family. - * - * @param row the columns associated with a row - * @param cfamily the column family to add columns to - */ - private void addColumnsToCF(List<?> row, ColumnFamily cfamily) - { - CFMetaData cfm = cfamily.metadata(); - assert cfm != null; - - for (Object c : row) - { - JsonColumn col = new JsonColumn<List>((List) c, cfm); - if (col.isRangeTombstone()) - { - Composite start = cfm.comparator.fromByteBuffer(col.getName()); - Composite end = cfm.comparator.fromByteBuffer(col.getValue()); - cfamily.addAtom(new RangeTombstone(start, end, col.timestamp, col.localExpirationTime)); - continue; - } - - assert cfm.isCQL3Table() || col.getName().hasRemaining() : "Cell name should not be empty"; - CellName cname = col.getName().hasRemaining() ? cfm.comparator.cellFromByteBuffer(col.getName()) - : cfm.comparator.rowMarker(Composites.EMPTY); - - if (col.isExpiring()) - { - cfamily.addColumn(new BufferExpiringCell(cname, col.getValue(), col.timestamp, col.ttl, col.localExpirationTime)); - } - else if (col.isCounter()) - { - cfamily.addColumn(new BufferCounterCell(cname, col.getValue(), col.timestamp, col.timestampOfLastDelete)); - } - else if (col.isDeleted()) - { - cfamily.addTombstone(cname, col.getValue(), col.timestamp); - } - else if (col.isRangeTombstone()) - { - CellName end = cfm.comparator.cellFromByteBuffer(col.getValue()); - cfamily.addAtom(new RangeTombstone(cname, end, col.timestamp, col.localExpirationTime)); - } - // cql3 row marker, see CASSANDRA-5852 - else if (cname.isEmpty()) - { - cfamily.addColumn(cfm.comparator.rowMarker(Composites.EMPTY), col.getValue(), col.timestamp); - } - else - { - cfamily.addColumn(cname, col.getValue(), col.timestamp); - } - } - } - - private void parseMeta(Map<?, ?> map, ColumnFamily cf, ByteBuffer superColumnName) - { - - // deletionInfo is the only metadata we store for now - if (map.containsKey("deletionInfo")) - { - Map<?, ?> unparsedDeletionInfo = (Map<?, ?>) map.get("deletionInfo"); - Number number = (Number) unparsedDeletionInfo.get("markedForDeleteAt"); - long markedForDeleteAt = number instanceof Long ? (Long) number : number.longValue(); - int localDeletionTime = (Integer) unparsedDeletionInfo.get("localDeletionTime"); - if (superColumnName == null) - cf.setDeletionInfo(new DeletionInfo(markedForDeleteAt, localDeletionTime)); - else - cf.addAtom(new RangeTombstone(SuperColumns.startOf(superColumnName), SuperColumns.endOf(superColumnName), markedForDeleteAt, localDeletionTime)); - } - } - - /** - * Convert a JSON formatted file to an SSTable. - * - * @param jsonFile the file containing JSON formatted data - * @param keyspace keyspace the data belongs to - * @param cf column family the data belongs to - * @param ssTablePath file to write the SSTable to - * - * @throws IOException for errors reading/writing input/output - */ - public int importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException - { - ColumnFamily columnFamily = ArrayBackedSortedColumns.factory.create(keyspace, cf); - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - - int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner) - : importUnsorted(jsonFile, columnFamily, ssTablePath, partitioner); - - if (importedKeys != -1) - System.out.printf("%d keys imported successfully.%n", importedKeys); - - return importedKeys; - } - - private int importUnsorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, IPartitioner partitioner) throws IOException - { - int importedKeys = 0; - long start = System.nanoTime(); - - Object[] data; - try (JsonParser parser = getParser(jsonFile)) - { - data = parser.readValueAs(new TypeReference<Object[]>(){}); - } - - keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport; - - try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0)) - { - System.out.printf("Importing %s keys...%n", keyCountToImport); - - // sort by dk representation, but hold onto the hex version - SortedMap<DecoratedKey, Map<?, ?>> decoratedKeys = new TreeMap<DecoratedKey, Map<?, ?>>(); - - for (Object row : data) - { - Map<?, ?> rowAsMap = (Map<?, ?>) row; - decoratedKeys.put(partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) rowAsMap.get("key"))), rowAsMap); - } - - for (Map.Entry<DecoratedKey, Map<?, ?>> row : decoratedKeys.entrySet()) - { - if (row.getValue().containsKey("metadata")) - { - parseMeta((Map<?, ?>) row.getValue().get("metadata"), columnFamily, null); - } - - Object columns = row.getValue().get("cells"); - addColumnsToCF((List<?>) columns, columnFamily); - - - writer.append(row.getKey(), columnFamily); - columnFamily.clear(); - - importedKeys++; - - long current = System.nanoTime(); - - if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs. - { - System.out.printf("Currently imported %d keys.%n", importedKeys); - start = current; - } - - if (keyCountToImport == importedKeys) - break; - } - - writer.finish(true); - } - - return importedKeys; - } - - private int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, - IPartitioner partitioner) throws IOException - { - int importedKeys = 0; // already imported keys count - long start = System.nanoTime(); - - try (JsonParser parser = getParser(jsonFile)) - { - - if (keyCountToImport == null) - { - keyCountToImport = 0; - System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)"); - - parser.nextToken(); // START_ARRAY - while (parser.nextToken() != null) - { - parser.skipChildren(); - if (parser.getCurrentToken() == JsonToken.END_ARRAY) - break; - - keyCountToImport++; - } - } - System.out.printf("Importing %s keys...%n", keyCountToImport); - } - - try (JsonParser parser = getParser(jsonFile); // renewing parser - SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);) - { - int lineNumber = 1; - DecoratedKey prevStoredKey = null; - - parser.nextToken(); // START_ARRAY - while (parser.nextToken() != null) - { - String key = parser.getCurrentName(); - Map<?, ?> row = parser.readValueAs(new TypeReference<Map<?, ?>>(){}); - DecoratedKey currentKey = partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) row.get("key"))); - - if (row.containsKey("metadata")) - parseMeta((Map<?, ?>) row.get("metadata"), columnFamily, null); - - addColumnsToCF((List<?>) row.get("cells"), columnFamily); - - if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1) - { - System.err - .printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n", - lineNumber, key); - return -1; - } - - // saving decorated key - writer.append(currentKey, columnFamily); - columnFamily.clear(); - - prevStoredKey = currentKey; - importedKeys++; - lineNumber++; - - long current = System.nanoTime(); - - if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs. - { - System.out.printf("Currently imported %d keys.%n", importedKeys); - start = current; - } - - if (keyCountToImport == importedKeys) - break; - - } - - writer.finish(true); - - return importedKeys; - } - } - - /** - * Get key validator for column family - * @param columnFamily column family instance - * @return key validator for given column family - */ - private AbstractType<?> getKeyValidator(ColumnFamily columnFamily) { - // this is a fix to support backward compatibility - // which allows to skip the current key validator - // please, take a look onto CASSANDRA-7498 for more details - if ("true".equals(System.getProperty("skip.key.validator", "false"))) { - return BytesType.instance; - } - return columnFamily.metadata().getKeyValidator(); - } - - /** - * Get JsonParser object for file - * @param fileName name of the file - * @return json parser instance for given file - * @throws IOException if any I/O error. - */ - private JsonParser getParser(String fileName) throws IOException - { - return factory.createJsonParser(new File(fileName)); - } - - /** - * Converts JSON to an SSTable file. JSON input can either be a file specified - * using an optional command line argument, or supplied on standard in. - * - * @param args command line arguments - * @throws ParseException on failure to parse JSON input - * @throws ConfigurationException on configuration error. - */ - public static void main(String[] args) throws ParseException, ConfigurationException - { - System.err.println("WARNING: please note that json2sstable is now deprecated and will be removed in Cassandra 3.0. " - + "You should use CQLSSTableWriter if you want to write sstables directly. " - + "Please see https://issues.apache.org/jira/browse/CASSANDRA-9618 for details."); - - CommandLineParser parser = new PosixParser(); - - try - { - cmd = parser.parse(options, args); - } - catch (org.apache.commons.cli.ParseException e) - { - System.err.println(e.getMessage()); - printProgramUsage(); - System.exit(1); - } - - if (cmd.getArgs().length != 2) - { - printProgramUsage(); - System.exit(1); - } - - String json = cmd.getArgs()[0]; - String ssTable = cmd.getArgs()[1]; - String keyspace = cmd.getOptionValue(KEYSPACE_OPTION); - String cfamily = cmd.getOptionValue(COLUMN_FAMILY_OPTION); - - Integer keyCountToImport = null; - boolean isSorted = false; - - if (cmd.hasOption(KEY_COUNT_OPTION)) - { - keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION)); - } - - if (cmd.hasOption(IS_SORTED_OPTION)) - { - isSorted = true; - } - - Schema.instance.loadFromDisk(false); - if (Schema.instance.getNonSystemKeyspaces().size() < 1) - { - String msg = "no non-system keyspaces are defined"; - System.err.println(msg); - throw new ConfigurationException(msg); - } - - try - { - new SSTableImport(keyCountToImport, isSorted).importJson(json, keyspace, cfamily, ssTable); - } - catch (Exception e) - { - JVMStabilityInspector.inspectThrowable(e); - e.printStackTrace(); - System.err.println("ERROR: " + e.getMessage()); - System.exit(-1); - } - - System.exit(0); - } - - private static void printProgramUsage() - { - System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n", - SSTableImport.class.getName()); - - System.out.println("Options:"); - for (Object o : options.getOptions()) - { - Option opt = (Option) o; - System.out.println(" -" +opt.getOpt() + " - " + opt.getDescription()); - } - } - - /** - * Convert a string to bytes (ByteBuffer) according to type - * @param content string to convert - * @param type type to use for conversion - * @return byte buffer representation of the given string - */ - private static ByteBuffer stringAsType(String content, AbstractType<?> type) - { - try - { - return type.fromString(content); - } - catch (MarshalException e) - { - throw new RuntimeException(e.getMessage()); - } - } - -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/tracing/TraceKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java index f66269d..f4d6450 100644 --- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java +++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java @@ -25,9 +25,8 @@ import com.google.common.collect.ImmutableMap; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.CFRowAdder; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; @@ -85,44 +84,36 @@ public final class TraceKeyspace String command, int ttl) { - Mutation mutation = new Mutation(NAME, sessionId); - ColumnFamily cells = mutation.addOrGet(TraceKeyspace.Sessions); + RowUpdateBuilder adder = new RowUpdateBuilder(Sessions, FBUtilities.timestampMicros(), ttl, sessionId) + .clustering() + .add("client", client) + .add("coordinator", FBUtilities.getBroadcastAddress()) + .add("request", request) + .add("started_at", new Date(startedAt)) + .add("command", command); - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl); - adder.add("client", client) - .add("coordinator", FBUtilities.getBroadcastAddress()) - .add("request", request) - .add("started_at", new Date(startedAt)) - .add("command", command); for (Map.Entry<String, String> entry : parameters.entrySet()) adder.addMapEntry("parameters", entry.getKey(), entry.getValue()); - - return mutation; + return adder.build(); } static Mutation makeStopSessionMutation(ByteBuffer sessionId, int elapsed, int ttl) { - Mutation mutation = new Mutation(NAME, sessionId); - ColumnFamily cells = mutation.addOrGet(Sessions); - - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl); - adder.add("duration", elapsed); - - return mutation; + return new RowUpdateBuilder(Sessions, FBUtilities.timestampMicros(), ttl, sessionId) + .clustering() + .add("duration", elapsed) + .build(); } static Mutation makeEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName, int ttl) { - Mutation mutation = new Mutation(NAME, sessionId); - ColumnFamily cells = mutation.addOrGet(Events); - - CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros(), ttl); - adder.add("activity", message) - .add("source", FBUtilities.getBroadcastAddress()) - .add("thread", threadName); + RowUpdateBuilder adder = new RowUpdateBuilder(Events, FBUtilities.timestampMicros(), ttl, sessionId) + .clustering(UUIDGen.getTimeUUID()); + adder.add("activity", message); + adder.add("source", FBUtilities.getBroadcastAddress()); + adder.add("thread", threadName); if (elapsed >= 0) adder.add("source_elapsed", elapsed); - - return mutation; + return adder.build(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 4e21678..4e54e46 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -141,6 +141,6 @@ public class QueryMessage extends Message.Request @Override public String toString() { - return "QUERY " + query; + return "QUERY " + query + "[pageSize = " + options.getPageSize() + "]"; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/triggers/ITrigger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/triggers/ITrigger.java b/src/java/org/apache/cassandra/triggers/ITrigger.java index 21aba05..ad631d1 100644 --- a/src/java/org/apache/cassandra/triggers/ITrigger.java +++ b/src/java/org/apache/cassandra/triggers/ITrigger.java @@ -24,11 +24,11 @@ package org.apache.cassandra.triggers; import java.nio.ByteBuffer; import java.util.Collection; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.Partition; /** - * Trigger interface, For every Mutation received by the coordinator {@link #augment(ByteBuffer, ColumnFamily)} + * Trigger interface, For every partition update received by the coordinator {@link #augment(Partition)} * is called.<p> * * <b> Contract:</b><br> @@ -44,9 +44,8 @@ public interface ITrigger /** * Called exactly once per CF update, returned mutations are atomically updated. * - * @param partitionKey - partition Key for the update. * @param update - update received for the CF * @return additional modifications to be applied along with the supplied update */ - public Collection<Mutation> augment(ByteBuffer partitionKey, ColumnFamily update); + public Collection<Mutation> augment(Partition update); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/triggers/TriggerExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java index 973ad8b..071a973 100644 --- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java +++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java @@ -22,13 +22,16 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.*; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import org.apache.cassandra.config.TriggerDefinition; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.FBUtilities; @@ -62,7 +65,7 @@ public class TriggerExecutor /** * Augment a partition update by executing triggers to generate an intermediate - * set of mutations, then merging the ColumnFamily from each mutation with those + * set of mutations, then merging the update from each mutation with those * supplied. This is called from @{link org.apache.cassandra.service.StorageProxy#cas} * which is scoped for a single partition. For that reason, any mutations generated * by triggers are checked to ensure that they are for the same table and partition @@ -77,22 +80,13 @@ public class TriggerExecutor * @throws InvalidRequestException if any mutation generated by a trigger does not * apply to the exact same partition as the initial update */ - public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException + public PartitionUpdate execute(PartitionUpdate updates) throws InvalidRequestException { - List<Mutation> intermediate = executeInternal(key, updates); + List<Mutation> intermediate = executeInternal(updates); if (intermediate == null || intermediate.isEmpty()) return updates; - validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate); - - for (Mutation mutation : intermediate) - { - for (ColumnFamily cf : mutation.getColumnFamilies()) - { - updates.addAll(cf); - } - } - return updates; + return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().cfId, updates.partitionKey(), intermediate)); } /** @@ -120,9 +114,9 @@ public class TriggerExecutor if (mutation instanceof CounterMutation) hasCounters = true; - for (ColumnFamily cf : mutation.getColumnFamilies()) + for (PartitionUpdate upd : mutation.getPartitionUpdates()) { - List<Mutation> augmentations = executeInternal(mutation.key(), cf); + List<Mutation> augmentations = executeInternal(upd); if (augmentations == null || augmentations.isEmpty()) continue; @@ -148,54 +142,66 @@ public class TriggerExecutor private Collection<Mutation> mergeMutations(Iterable<Mutation> mutations) { - Map<Pair<String, ByteBuffer>, Mutation> groupedMutations = new HashMap<>(); + ListMultimap<Pair<String, ByteBuffer>, Mutation> groupedMutations = ArrayListMultimap.create(); for (Mutation mutation : mutations) { - Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key()); - Mutation current = groupedMutations.get(key); - if (current == null) - { - // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap(). - groupedMutations.put(key, mutation.copy()); - } - else - { - current.addAll(mutation); - } + Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key().getKey()); + groupedMutations.put(key, mutation); } - return groupedMutations.values(); + List<Mutation> merged = new ArrayList<>(groupedMutations.size()); + for (Pair<String, ByteBuffer> key : groupedMutations.keySet()) + merged.add(Mutation.merge(groupedMutations.get(key))); + + return merged; } - private void validateForSinglePartition(AbstractType<?> keyValidator, - UUID cfId, - ByteBuffer key, - Collection<Mutation> tmutations) + private Collection<PartitionUpdate> validateForSinglePartition(UUID cfId, + DecoratedKey key, + Collection<Mutation> tmutations) throws InvalidRequestException { - for (Mutation mutation : tmutations) + validate(tmutations); + + if (tmutations.size() == 1) { - if (keyValidator.compare(mutation.key(), key) != 0) - throw new InvalidRequestException("Partition key of additional mutation does not match primary update key"); + Collection<PartitionUpdate> updates = Iterables.getOnlyElement(tmutations).getPartitionUpdates(); + if (updates.size() > 1) + throw new InvalidRequestException("The updates generated by triggers are not all for the same partition"); + validateSamePartition(cfId, key, Iterables.getOnlyElement(updates)); + return updates; + } - for (ColumnFamily cf : mutation.getColumnFamilies()) + ArrayList<PartitionUpdate> updates = new ArrayList<>(tmutations.size()); + for (Mutation mutation : tmutations) + { + for (PartitionUpdate update : mutation.getPartitionUpdates()) { - if (! cf.id().equals(cfId)) - throw new InvalidRequestException("table of additional mutation does not match primary update table"); + validateSamePartition(cfId, key, update); + updates.add(update); } } - validate(tmutations); + return updates; + } + + private void validateSamePartition(UUID cfId, DecoratedKey key, PartitionUpdate update) + throws InvalidRequestException + { + if (!key.equals(update.partitionKey())) + throw new InvalidRequestException("Partition key of additional mutation does not match primary update key"); + + if (!cfId.equals(update.metadata().cfId)) + throw new InvalidRequestException("table of additional mutation does not match primary update table"); } private void validate(Collection<Mutation> tmutations) throws InvalidRequestException { for (Mutation mutation : tmutations) { - QueryProcessor.validateKey(mutation.key()); - for (ColumnFamily tcf : mutation.getColumnFamilies()) - for (Cell cell : tcf) - cell.validateFields(tcf.metadata()); + QueryProcessor.validateKey(mutation.key().getKey()); + for (PartitionUpdate update : mutation.getPartitionUpdates()) + update.validate(); } } @@ -203,9 +209,9 @@ public class TriggerExecutor * Switch class loader before using the triggers for the column family, if * not loaded them with the custom class loader. */ - private List<Mutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily) + private List<Mutation> executeInternal(PartitionUpdate update) { - Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers(); + Map<String, TriggerDefinition> triggers = update.metadata().getTriggers(); if (triggers.isEmpty()) return null; List<Mutation> tmutations = Lists.newLinkedList(); @@ -220,7 +226,7 @@ public class TriggerExecutor trigger = loadTriggerInstance(td.classOption); cachedTriggers.put(td.classOption, trigger); } - Collection<Mutation> temp = trigger.augment(key, columnFamily); + Collection<Mutation> temp = trigger.augment(update); if (temp != null) tmutations.addAll(temp); } @@ -228,7 +234,7 @@ public class TriggerExecutor } catch (Exception ex) { - throw new RuntimeException(String.format("Exception while creating trigger on table with ID: %s", columnFamily.id()), ex); + throw new RuntimeException(String.format("Exception while creating trigger on table with ID: %s", update.metadata().cfId), ex); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/ByteBufferUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java index 1831c19..69915bf 100644 --- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java +++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.UUID; import net.nicoulaj.compilecommand.annotations.Inline; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; @@ -322,6 +323,12 @@ public class ByteBufferUtil return ByteBufferUtil.read(in, length); } + public static int serializedSizeWithLength(ByteBuffer buffer, TypeSizes sizes) + { + int size = buffer.remaining(); + return sizes.sizeof(size) + size; + } + /* @return An unsigned short in an integer. */ public static int readShortLength(DataInput in) throws IOException { @@ -338,16 +345,21 @@ public class ByteBufferUtil return ByteBufferUtil.read(in, readShortLength(in)); } + public static int serializedSizeWithShortLength(ByteBuffer buffer, TypeSizes sizes) + { + int size = buffer.remaining(); + return sizes.sizeof((short)size) + size; + } + /** * @param in data input * @return null * @throws IOException if an I/O error occurs. */ - public static ByteBuffer skipShortLength(DataInput in) throws IOException + public static void skipShortLength(DataInput in) throws IOException { int skip = readShortLength(in); FileUtils.skipBytesFully(in, skip); - return null; } public static ByteBuffer read(DataInput in, int length) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 17edeb0..b0bed7b 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -372,6 +372,11 @@ public class FBUtilities return System.currentTimeMillis() * 1000; } + public static int nowInSeconds() + { + return (int)(System.currentTimeMillis() / 1000); + } + public static void waitOnFutures(Iterable<Future<?>> futures) { for (Future f : futures) @@ -502,13 +507,18 @@ public class FBUtilities } } - public static <T> SortedSet<T> singleton(T column, Comparator<? super T> comparator) + public static <T> NavigableSet<T> singleton(T column, Comparator<? super T> comparator) { - SortedSet<T> s = new TreeSet<T>(comparator); + NavigableSet<T> s = new TreeSet<T>(comparator); s.add(column); return s; } + public static <T> NavigableSet<T> emptySortedSet(Comparator<? super T> comparator) + { + return new TreeSet<T>(comparator); + } + public static String toString(Map<?,?> map) { Joiner.MapJoiner joiner = Joiner.on(", ").withKeyValueSeparator(":"); @@ -796,4 +806,30 @@ public class FBUtilities digest.update((byte) ((val >>> 8) & 0xFF)); digest.update((byte) ((val >>> 0) & 0xFF)); } + + public static void updateWithBoolean(MessageDigest digest, boolean val) + { + updateWithByte(digest, val ? 0 : 1); + } + + public static void closeAll(List<? extends AutoCloseable> l) throws Exception + { + Exception toThrow = null; + for (AutoCloseable c : l) + { + try + { + c.close(); + } + catch (Exception e) + { + if (toThrow == null) + toThrow = e; + else + toThrow.addSuppressed(e); + } + } + if (toThrow != null) + throw toThrow; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/MergeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java b/src/java/org/apache/cassandra/utils/MergeIterator.java index e61326e..d0f116e 100644 --- a/src/java/org/apache/cassandra/utils/MergeIterator.java +++ b/src/java/org/apache/cassandra/utils/MergeIterator.java @@ -18,7 +18,6 @@ package org.apache.cassandra.utils; import java.io.Closeable; -import java.io.IOException; import java.util.*; import com.google.common.collect.AbstractIterator; @@ -35,9 +34,9 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem this.reducer = reducer; } - public static <In, Out> IMergeIterator<In, Out> get(List<? extends Iterator<In>> sources, - Comparator<In> comparator, - Reducer<In, Out> reducer) + public static <In, Out> MergeIterator<In, Out> get(List<? extends Iterator<In>> sources, + Comparator<? super In> comparator, + Reducer<In, Out> reducer) { if (sources.size() == 1) { @@ -59,9 +58,10 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem { try { - ((Closeable)iterator).close(); + if (iterator instanceof AutoCloseable) + ((AutoCloseable)iterator).close(); } - catch (IOException e) + catch (Exception e) { throw new RuntimeException(e); } @@ -79,13 +79,13 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem // TODO: if we had our own PriorityQueue implementation we could stash items // at the end of its array, so we wouldn't need this storage protected final ArrayDeque<Candidate<In>> candidates; - public ManyToOne(List<? extends Iterator<In>> iters, Comparator<In> comp, Reducer<In, Out> reducer) + public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp, Reducer<In, Out> reducer) { super(iters, reducer); this.queue = new PriorityQueue<>(Math.max(1, iters.size())); - for (Iterator<In> iter : iters) + for (int i = 0; i < iters.size(); i++) { - Candidate<In> candidate = new Candidate<>(iter, comp); + Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp); if (!candidate.advance()) // was empty continue; @@ -111,7 +111,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem { candidate = queue.poll(); candidates.push(candidate); - reducer.reduce(candidate.item); + reducer.reduce(candidate.idx, candidate.item); } while (queue.peek() != null && queue.peek().compareTo(candidate) == 0); return reducer.getReduced(); @@ -130,14 +130,16 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem // Holds and is comparable by the head item of an iterator it owns protected static final class Candidate<In> implements Comparable<Candidate<In>> { - private final Iterator<In> iter; - private final Comparator<In> comp; + private final Iterator<? extends In> iter; + private final Comparator<? super In> comp; + private final int idx; private In item; - public Candidate(Iterator<In> iter, Comparator<In> comp) + public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp) { this.iter = iter; this.comp = comp; + this.idx = idx; } /** @return True if our iterator had an item, and it is now available */ @@ -170,7 +172,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem * combine this object with the previous ones. * intermediate state is up to your implementation. */ - public abstract void reduce(In current); + public abstract void reduce(int idx, In current); /** @return The last object computed by reduce */ protected abstract Out getReduced(); @@ -202,7 +204,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem if (!source.hasNext()) return endOfData(); reducer.onKeyChange(); - reducer.reduce(source.next()); + reducer.reduce(0, source.next()); return reducer.getReduced(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java index 5448390..f2663bf 100644 --- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java +++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java @@ -21,13 +21,13 @@ import java.net.InetAddress; import java.util.*; import com.datastax.driver.core.*; + +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.CompactTables; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.CellNames; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableLoader; @@ -100,27 +100,70 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client { Map<String, CFMetaData> tables = new HashMap<>(); - String query = String.format("SELECT columnfamily_name, cf_id, type, comparator, subcomparator, is_dense FROM %s.%s WHERE keyspace_name = '%s'", + String query = String.format("SELECT columnfamily_name, cf_id, type, comparator, subcomparator, is_dense, default_validator FROM %s.%s WHERE keyspace_name = '%s'", SystemKeyspace.NAME, LegacySchemaTables.COLUMNFAMILIES, keyspace); + + // The following is a slightly simplified but otherwise duplicated version of LegacySchemaTables.createTableFromTableRowAndColumnRows. It might + // be safer to have a simple wrapper of the driver ResultSet/Row implementing UntypedResultSet/UntypedResultSet.Row and reuse the original method. for (Row row : session.execute(query)) { String name = row.getString("columnfamily_name"); UUID id = row.getUUID("cf_id"); - ColumnFamilyType type = ColumnFamilyType.valueOf(row.getString("type")); + boolean isSuper = row.getString("type").toLowerCase().equals("super"); AbstractType rawComparator = TypeParser.parse(row.getString("comparator")); AbstractType subComparator = row.isNull("subcomparator") ? null : TypeParser.parse(row.getString("subcomparator")); boolean isDense = row.getBool("is_dense"); - CellNameType comparator = CellNames.fromAbstractType(CFMetaData.makeRawAbstractType(rawComparator, subComparator), - isDense); + boolean isCompound = rawComparator instanceof CompositeType; + + AbstractType<?> defaultValidator = TypeParser.parse(row.getString("default_validator")); + boolean isCounter = defaultValidator instanceof CounterColumnType; + boolean isCQLTable = !isSuper && !isDense && isCompound; - tables.put(name, new CFMetaData(keyspace, name, type, comparator, id)); + String columnsQuery = String.format("SELECT column_name, component_index, type, validator FROM %s.%s WHERE keyspace_name='%s' AND columnfamily_name='%s'", + SystemKeyspace.NAME, + LegacySchemaTables.COLUMNS, + keyspace, + name); + + List<ColumnDefinition> defs = new ArrayList<>(); + for (Row colRow : session.execute(columnsQuery)) + defs.add(createDefinitionFromRow(colRow, keyspace, name, rawComparator, subComparator, isSuper, isCQLTable)); + + tables.put(name, CFMetaData.create(keyspace, name, id, isDense, isCompound, isSuper, isCounter, defs)); } return tables; } + + // A slightly simplified version of LegacySchemaTables. + private static ColumnDefinition createDefinitionFromRow(Row row, + String keyspace, + String table, + AbstractType<?> rawComparator, + AbstractType<?> rawSubComparator, + boolean isSuper, + boolean isCQLTable) + { + ColumnDefinition.Kind kind = LegacySchemaTables.deserializeKind(row.getString("type")); + + Integer componentIndex = null; + if (!row.isNull("component_index")) + componentIndex = row.getInt("component_index"); + + // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we + // we need to use the comparator fromString method + AbstractType<?> comparator = isCQLTable + ? UTF8Type.instance + : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator); + ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator); + + AbstractType<?> validator = TypeParser.parse(row.getString("validator")); + + return new ColumnDefinition(keyspace, table, name, validator, null, null, null, componentIndex, kind); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/Sorting.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/Sorting.java b/src/java/org/apache/cassandra/utils/Sorting.java new file mode 100644 index 0000000..b1c0b46 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Sorting.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +public abstract class Sorting +{ + private Sorting() {} + + /** + * Interface that allows to sort elements addressable by index, but without actually requiring those + * to elements to be part of a list/array. + */ + public interface Sortable + { + /** + * The number of elements to sort. + */ + public int size(); + + /** + * Compares the element with index i should sort before the element with index j. + */ + public int compare(int i, int j); + + /** + * Swaps element i and j. + */ + public void swap(int i, int j); + } + + /** + * Sort a sortable. + * + * The actual algorithm is a direct adaptation of the standard sorting in golang + * at http://golang.org/src/pkg/sort/sort.go (comments included). + * + * It makes one call to data.Len to determine n, and O(n*log(n)) calls to + * data.Less and data.Swap. The sort is not guaranteed to be stable. + */ + public static void sort(Sortable data) + { + // Switch to heapsort if depth of 2*ceil(lg(n+1)) is reached. + int n = data.size(); + int maxDepth = 0; + for (int i = n; i > 0; i >>= 1) + maxDepth++; + maxDepth *= 2; + quickSort(data, 0, n, maxDepth); + } + + private static void insertionSort(Sortable data, int a, int b) + { + for (int i = a + 1; i < b; i++) + for(int j = i; j > a && data.compare(j, j-1) < 0; j--) + data.swap(j, j-1); + } + + // siftDown implements the heap property on data[lo, hi). + // first is an offset into the array where the root of the heap lies. + private static void siftDown(Sortable data, int lo, int hi, int first) + { + int root = lo; + while (true) + { + int child = 2*root + 1; + if (child >= hi) + return; + + if (child + 1 < hi && data.compare(first+child, first+child+1) < 0) + child++; + + if (data.compare(first+root, first+child) >= 0) + return; + + data.swap(first+root, first+child); + root = child; + } + } + + private static void heapSort(Sortable data, int a, int b) + { + int first = a; + int lo = 0; + int hi = b - a; + + // Build heap with greatest element at top. + for (int i = (hi - 1) / 2; i >= 0; i--) + siftDown(data, i, hi, first); + + // Pop elements, largest first, into end of data. + for (int i = hi - 1; i >= 0; i--) { + data.swap(first, first+i); + siftDown(data, lo, i, first); + } + } + + // Quicksort, following Bentley and McIlroy, + // ``Engineering a Sort Function,'' SP&E November 1993. + + // medianOfThree moves the median of the three values data[a], data[b], data[c] into data[a]. + private static void medianOfThree(Sortable data, int a, int b, int c) + { + int m0 = b; + int m1 = a; + int m2 = c; + // bubble sort on 3 elements + if (data.compare(m1, m0) < 0) + data.swap(m1, m0); + if (data.compare(m2, m1) < 0) + data.swap(m2, m1); + if (data.compare(m1, m0) < 0) + data.swap(m1, m0); + // now data[m0] <= data[m1] <= data[m2] + } + + private static void swapRange(Sortable data, int a, int b, int n) + { + for (int i = 0; i < n; i++) + data.swap(a+i, b+i); + } + + private static void doPivot(Sortable data, int lo, int hi, int[] result) + { + int m = lo + (hi-lo)/2; // Written like this to avoid integer overflow. + if (hi-lo > 40) { + // Tukey's ``Ninther,'' median of three medians of three. + int s = (hi - lo) / 8; + medianOfThree(data, lo, lo+s, lo+2*s); + medianOfThree(data, m, m-s, m+s); + medianOfThree(data, hi-1, hi-1-s, hi-1-2*s); + } + medianOfThree(data, lo, m, hi-1); + + // Invariants are: + // data[lo] = pivot (set up by ChoosePivot) + // data[lo <= i < a] = pivot + // data[a <= i < b] < pivot + // data[b <= i < c] is unexamined + // data[c <= i < d] > pivot + // data[d <= i < hi] = pivot + // + // Once b meets c, can swap the "= pivot" sections + // into the middle of the slice. + int pivot = lo; + int a = lo+1, b = lo+1, c = hi, d =hi; + while (true) + { + while (b < c) + { + int cmp = data.compare(b, pivot); + if (cmp < 0) // data[b] < pivot + { + b++; + } + else if (cmp == 0) // data[b] = pivot + { + data.swap(a, b); + a++; + b++; + } + else + { + break; + } + } + + while (b < c) + { + int cmp = data.compare(pivot, c-1); + if (cmp < 0) // data[c-1] > pivot + { + c--; + } + else if (cmp == 0) // data[c-1] = pivot + { + data.swap(c-1, d-1); + c--; + d--; + } + else + { + break; + } + } + + if (b >= c) + break; + + // data[b] > pivot; data[c-1] < pivot + data.swap(b, c-1); + b++; + c--; + } + + int n = Math.min(b-a, a-lo); + swapRange(data, lo, b-n, n); + + n = Math.min(hi-d, d-c); + swapRange(data, c, hi-n, n); + + result[0] = lo + b - a; + result[1] = hi - (d - c); + } + + private static void quickSort(Sortable data, int a, int b, int maxDepth) + { + int[] buffer = new int[2]; + + while (b-a > 7) + { + if (maxDepth == 0) + { + heapSort(data, a, b); + return; + } + + maxDepth--; + + doPivot(data, a, b, buffer); + int mlo = buffer[0]; + int mhi = buffer[1]; + // Avoiding recursion on the larger subproblem guarantees + // a stack depth of at most lg(b-a). + if (mlo-a < b-mhi) + { + quickSort(data, a, mlo, maxDepth); + a = mhi; // i.e., quickSort(data, mhi, b) + } + else + { + quickSort(data, mhi, b, maxDepth); + b = mlo; // i.e., quickSort(data, a, mlo) + } + } + + if (b-a > 1) + insertionSort(data, a, b); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/BTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index 1145d12..bf68ffa 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -26,8 +26,6 @@ import java.util.Queue; import org.apache.cassandra.utils.ObjectSizes; -import static org.apache.cassandra.utils.btree.UpdateFunction.NoOp; - public class BTree { /** @@ -79,21 +77,18 @@ public class BTree return EMPTY_LEAF; } - public static <V> Object[] build(Collection<V> source, Comparator<V> comparator, boolean sorted, UpdateFunction<V> updateF) + public static <C, K extends C, V extends C> Object[] build(Collection<K> source, UpdateFunction<K, V> updateF) { - return build(source, source.size(), comparator, sorted, updateF); + return build(source, source.size(), updateF); } /** * Creates a BTree containing all of the objects in the provided collection * * @param source the items to build the tree with - * @param comparator the comparator that defines the ordering over the items in the tree - * @param sorted if false, the collection will be copied and sorted to facilitate construction - * @param <V> * @return */ - public static <V> Object[] build(Iterable<V> source, int size, Comparator<V> comparator, boolean sorted, UpdateFunction<V> updateF) + public static <C, K extends C, V extends C> Object[] build(Iterable<K> source, int size, UpdateFunction<K, V> updateF) { if (size < FAN_FACTOR) { @@ -101,27 +96,13 @@ public class BTree V[] values = (V[]) new Object[size + (size & 1)]; { int i = 0; - for (V v : source) - values[i++] = v; - } - - // inline sorting since we're already calling toArray - if (!sorted) - Arrays.sort(values, 0, size, comparator); - - // if updateF is specified - if (updateF != null) - { - for (int i = 0 ; i < size ; i++) - values[i] = updateF.apply(values[i]); - updateF.allocated(ObjectSizes.sizeOfArray(values)); + for (K k : source) + values[i++] = updateF.apply(k); } + updateF.allocated(ObjectSizes.sizeOfArray(values)); return values; } - if (!sorted) - source = sorted(source, comparator, size); - Queue<Builder> queue = modifier.get(); Builder builder = queue.poll(); if (builder == null) @@ -131,28 +112,12 @@ public class BTree return btree; } - /** - * Returns a new BTree with the provided set inserting/replacing as necessary any equal items - * - * @param btree the tree to update - * @param comparator the comparator that defines the ordering over the items in the tree - * @param updateWith the items to either insert / update - * @param updateWithIsSorted if false, updateWith will be copied and sorted to facilitate construction - * @param <V> - * @return - */ - public static <V> Object[] update(Object[] btree, Comparator<V> comparator, Collection<V> updateWith, boolean updateWithIsSorted) - { - return update(btree, comparator, updateWith, updateWithIsSorted, NoOp.<V>instance()); - } - - public static <V> Object[] update(Object[] btree, - Comparator<V> comparator, - Collection<V> updateWith, - boolean updateWithIsSorted, - UpdateFunction<V> updateF) + public static <C, K extends C, V extends C> Object[] update(Object[] btree, + Comparator<C> comparator, + Collection<K> updateWith, + UpdateFunction<K, V> updateF) { - return update(btree, comparator, updateWith, updateWith.size(), updateWithIsSorted, updateF); + return update(btree, comparator, updateWith, updateWith.size(), updateF); } /** @@ -161,23 +126,19 @@ public class BTree * @param btree the tree to update * @param comparator the comparator that defines the ordering over the items in the tree * @param updateWith the items to either insert / update - * @param updateWithIsSorted if false, updateWith will be copied and sorted to facilitate construction + * @param updateWithLength then number of elements in updateWith * @param updateF the update function to apply to any pairs we are swapping, and maybe abort early * @param <V> * @return */ - public static <V> Object[] update(Object[] btree, - Comparator<V> comparator, - Iterable<V> updateWith, - int updateWithLength, - boolean updateWithIsSorted, - UpdateFunction<V> updateF) + public static <C, K extends C, V extends C> Object[] update(Object[] btree, + Comparator<C> comparator, + Iterable<K> updateWith, + int updateWithLength, + UpdateFunction<K, V> updateF) { if (btree.length == 0) - return build(updateWith, updateWithLength, comparator, updateWithIsSorted, updateF); - - if (!updateWithIsSorted) - updateWith = sorted(updateWith, comparator, updateWithLength); + return build(updateWith, updateWithLength, updateF); Queue<Builder> queue = modifier.get(); Builder builder = queue.poll(); @@ -361,17 +322,6 @@ public class BTree } }; - // return a sorted collection - private static <V> Collection<V> sorted(Iterable<V> source, Comparator<V> comparator, int size) - { - V[] vs = (V[]) new Object[size]; - int i = 0; - for (V v : source) - vs[i++] = v; - Arrays.sort(vs, comparator); - return Arrays.asList(vs); - } - /** simple static wrapper to calls to cmp.compare() which checks if either a or b are Special (i.e. represent an infinity) */ // TODO : cheaper to check for POSITIVE/NEGATIVE infinity in callers, rather than here static <V> int compare(Comparator<V> cmp, Object a, Object b) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java index 7a83238..403f234 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java +++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java @@ -25,43 +25,76 @@ import static org.apache.cassandra.utils.btree.BTree.getKeyEnd; public class BTreeSearchIterator<CK, K extends CK, V> extends Path implements SearchIterator<K, V> { - final Comparator<CK> comparator; - public BTreeSearchIterator(Object[] btree, Comparator<CK> comparator) + final boolean forwards; + + public BTreeSearchIterator(Object[] btree, Comparator<CK> comparator, boolean forwards) { init(btree); + if (!forwards) + this.indexes[0] = (byte)(getKeyEnd(path[0]) - 1); this.comparator = comparator; + this.forwards = forwards; } public V next(K target) { - while (depth > 0) + // We could probably avoid some of the repetition but leaving that for later. + if (forwards) { - byte successorParentDepth = findSuccessorParentDepth(); - if (successorParentDepth < 0) - break; // we're in last section of tree, so can only search down - int successorParentIndex = indexes[successorParentDepth] + 1; - Object[] successParentNode = path[successorParentDepth]; - Object successorParentKey = successParentNode[successorParentIndex]; - int c = BTree.compare(comparator, target, successorParentKey); - if (c < 0) - break; - if (c == 0) + while (depth > 0) { + byte successorParentDepth = findSuccessorParentDepth(); + if (successorParentDepth < 0) + break; // we're in last section of tree, so can only search down + int successorParentIndex = indexes[successorParentDepth] + 1; + Object[] successParentNode = path[successorParentDepth]; + Object successorParentKey = successParentNode[successorParentIndex]; + int c = BTree.compare(comparator, target, successorParentKey); + if (c < 0) + break; + if (c == 0) + { + depth = successorParentDepth; + indexes[successorParentDepth]++; + return (V) successorParentKey; + } depth = successorParentDepth; indexes[successorParentDepth]++; - return (V) successorParentKey; } - depth = successorParentDepth; - indexes[successorParentDepth]++; + if (find(comparator, target, Op.CEIL, true)) + return (V) currentKey(); + } + else + { + while (depth > 0) + { + byte predecessorParentDepth = findPredecessorParentDepth(); + if (predecessorParentDepth < 0) + break; // we're in last section of tree, so can only search down + int predecessorParentIndex = indexes[predecessorParentDepth] - 1; + Object[] predecessParentNode = path[predecessorParentDepth]; + Object predecessorParentKey = predecessParentNode[predecessorParentIndex]; + int c = BTree.compare(comparator, target, predecessorParentKey); + if (c > 0) + break; + if (c == 0) + { + depth = predecessorParentDepth; + indexes[predecessorParentDepth]--; + return (V) predecessorParentKey; + } + depth = predecessorParentDepth; + indexes[predecessorParentDepth]--; + } + if (find(comparator, target, Op.FLOOR, false)) + return (V) currentKey(); } - if (find(comparator, target, Op.CEIL, true)) - return (V) currentKey(); return null; } public boolean hasNext() { - return depth != 0 || indexes[0] != getKeyEnd(path[0]); + return depth != 0 || indexes[0] != (forwards ? getKeyEnd(path[0]) : -1); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/BTreeSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java deleted file mode 100644 index d80b32e..0000000 --- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cassandra.utils.btree; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.NavigableSet; -import java.util.SortedSet; - -public class BTreeSet<V> implements NavigableSet<V> -{ - protected final Comparator<V> comparator; - protected final Object[] tree; - - public BTreeSet(Object[] tree, Comparator<V> comparator) - { - this.tree = tree; - this.comparator = comparator; - } - - public BTreeSet<V> update(Collection<V> updateWith, boolean isSorted) - { - return new BTreeSet<>(BTree.update(tree, comparator, updateWith, isSorted, UpdateFunction.NoOp.<V>instance()), comparator); - } - - @Override - public Comparator<? super V> comparator() - { - return comparator; - } - - protected Cursor<V, V> slice(boolean forwards, boolean permitInversion) - { - return BTree.slice(tree, forwards); - } - - @Override - public int size() - { - return slice(true, false).count(); - } - - @Override - public boolean isEmpty() - { - return slice(true, false).hasNext(); - } - - @Override - public Iterator<V> iterator() - { - return slice(true, true); - } - - @Override - public Iterator<V> descendingIterator() - { - return slice(false, true); - } - - @Override - public Object[] toArray() - { - return toArray(new Object[0]); - } - - @Override - public <T> T[] toArray(T[] a) - { - int size = size(); - if (a.length < size) - a = Arrays.copyOf(a, size); - int i = 0; - for (V v : this) - a[i++] = (T) v; - return a; - } - - @Override - public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive) - { - return new BTreeRange<>(tree, comparator, fromElement, fromInclusive, toElement, toInclusive); - } - - @Override - public NavigableSet<V> headSet(V toElement, boolean inclusive) - { - return new BTreeRange<>(tree, comparator, null, true, toElement, inclusive); - } - - @Override - public NavigableSet<V> tailSet(V fromElement, boolean inclusive) - { - return new BTreeRange<>(tree, comparator, fromElement, inclusive, null, true); - } - - @Override - public SortedSet<V> subSet(V fromElement, V toElement) - { - return subSet(fromElement, true, toElement, false); - } - - @Override - public SortedSet<V> headSet(V toElement) - { - return headSet(toElement, false); - } - - @Override - public SortedSet<V> tailSet(V fromElement) - { - return tailSet(fromElement, true); - } - - @Override - public V first() - { - throw new UnsupportedOperationException(); - } - - @Override - public V last() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean addAll(Collection<? extends V> c) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(Collection<?> c) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean removeAll(Collection<?> c) - { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() - { - throw new UnsupportedOperationException(); - } - - @Override - public V pollFirst() - { - throw new UnsupportedOperationException(); - } - - @Override - public V pollLast() - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean add(V v) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean remove(Object o) - { - throw new UnsupportedOperationException(); - } - - @Override - public V lower(V v) - { - throw new UnsupportedOperationException(); - } - - @Override - public V floor(V v) - { - throw new UnsupportedOperationException(); - } - - @Override - public V ceiling(V v) - { - throw new UnsupportedOperationException(); - } - - @Override - public V higher(V v) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean contains(Object o) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(Collection<?> c) - { - throw new UnsupportedOperationException(); - } - - @Override - public NavigableSet<V> descendingSet() - { - return new BTreeRange<>(this.tree, this.comparator).descendingSet(); - } - - public static class BTreeRange<V> extends BTreeSet<V> implements NavigableSet<V> - { - - protected final V lowerBound, upperBound; - protected final boolean inclusiveLowerBound, inclusiveUpperBound; - - BTreeRange(Object[] tree, Comparator<V> comparator) - { - this(tree, comparator, null, true, null, true); - } - - BTreeRange(BTreeRange<V> from) - { - this(from.tree, from.comparator, from.lowerBound, from.inclusiveLowerBound, from.upperBound, from.inclusiveUpperBound); - } - - BTreeRange(Object[] tree, Comparator<V> comparator, V lowerBound, boolean inclusiveLowerBound, V upperBound, boolean inclusiveUpperBound) - { - super(tree, comparator); - this.lowerBound = lowerBound; - this.upperBound = upperBound; - this.inclusiveLowerBound = inclusiveLowerBound; - this.inclusiveUpperBound = inclusiveUpperBound; - } - - // narrowing range constructor - makes this the intersection of the two ranges over the same tree b - BTreeRange(BTreeRange<V> a, BTreeRange<V> b) - { - super(a.tree, a.comparator); - assert a.tree == b.tree; - final BTreeRange<V> lb, ub; - - if (a.lowerBound == null) - { - lb = b; - } - else if (b.lowerBound == null) - { - lb = a; - } - else - { - int c = comparator.compare(a.lowerBound, b.lowerBound); - if (c < 0) - lb = b; - else if (c > 0) - lb = a; - else if (!a.inclusiveLowerBound) - lb = a; - else - lb = b; - } - - if (a.upperBound == null) - { - ub = b; - } - else if (b.upperBound == null) - { - ub = a; - } - else - { - int c = comparator.compare(b.upperBound, a.upperBound); - if (c < 0) - ub = b; - else if (c > 0) - ub = a; - else if (!a.inclusiveUpperBound) - ub = a; - else - ub = b; - } - - lowerBound = lb.lowerBound; - inclusiveLowerBound = lb.inclusiveLowerBound; - upperBound = ub.upperBound; - inclusiveUpperBound = ub.inclusiveUpperBound; - } - - @Override - protected Cursor<V, V> slice(boolean forwards, boolean permitInversion) - { - return BTree.slice(tree, comparator, lowerBound, inclusiveLowerBound, upperBound, inclusiveUpperBound, forwards); - } - - @Override - public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive) - { - return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, fromElement, fromInclusive, toElement, toInclusive)); - } - - @Override - public NavigableSet<V> headSet(V toElement, boolean inclusive) - { - return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, lowerBound, true, toElement, inclusive)); - } - - @Override - public NavigableSet<V> tailSet(V fromElement, boolean inclusive) - { - return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, fromElement, inclusive, null, true)); - } - - @Override - public NavigableSet<V> descendingSet() - { - return new BTreeDescRange<>(this); - } - } - - public static class BTreeDescRange<V> extends BTreeRange<V> - { - BTreeDescRange(BTreeRange<V> from) - { - super(from.tree, from.comparator, from.lowerBound, from.inclusiveLowerBound, from.upperBound, from.inclusiveUpperBound); - } - - @Override - protected Cursor<V, V> slice(boolean forwards, boolean permitInversion) - { - return super.slice(permitInversion ? !forwards : forwards, false); - } - - @Override - public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive) - { - return super.subSet(toElement, toInclusive, fromElement, fromInclusive).descendingSet(); - } - - @Override - public NavigableSet<V> headSet(V toElement, boolean inclusive) - { - return super.tailSet(toElement, inclusive).descendingSet(); - } - - @Override - public NavigableSet<V> tailSet(V fromElement, boolean inclusive) - { - return super.headSet(fromElement, inclusive).descendingSet(); - } - - @Override - public NavigableSet<V> descendingSet() - { - return new BTreeRange<>(this); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/Builder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/Builder.java b/src/java/org/apache/cassandra/utils/btree/Builder.java index ab78016..b835554 100644 --- a/src/java/org/apache/cassandra/utils/btree/Builder.java +++ b/src/java/org/apache/cassandra/utils/btree/Builder.java @@ -54,14 +54,14 @@ final class Builder * we assume @param source has been sorted, e.g. by BTree.update, so the update of each key resumes where * the previous left off. */ - public <V> Object[] update(Object[] btree, Comparator<V> comparator, Iterable<V> source, UpdateFunction<V> updateF) + public <C, K extends C, V extends C> Object[] update(Object[] btree, Comparator<C> comparator, Iterable<K> source, UpdateFunction<K, V> updateF) { assert updateF != null; NodeBuilder current = rootBuilder; current.reset(btree, POSITIVE_INFINITY, updateF, comparator); - for (V key : source) + for (K key : source) { while (true) { @@ -96,7 +96,7 @@ final class Builder return r; } - public <V> Object[] build(Iterable<V> source, UpdateFunction<V> updateF, int size) + public <C, K extends C, V extends C> Object[] build(Iterable<K> source, UpdateFunction<K, V> updateF, int size) { assert updateF != null; @@ -107,7 +107,7 @@ final class Builder current = current.ensureChild(); current.reset(EMPTY_LEAF, POSITIVE_INFINITY, updateF, null); - for (V key : source) + for (K key : source) current.addNewKey(updateF.apply(key)); current = current.ascendToRoot(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/Path.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/Path.java b/src/java/org/apache/cassandra/utils/btree/Path.java index b1b0e03..9b6789c 100644 --- a/src/java/org/apache/cassandra/utils/btree/Path.java +++ b/src/java/org/apache/cassandra/utils/btree/Path.java @@ -104,8 +104,7 @@ public class Path<V> // search Object[] node = path[depth]; - int lb = indexes[depth]; - assert lb == 0 || forwards; + int lb = forwards ? indexes[depth] : 0; pop(); if (target instanceof BTree.Special) @@ -223,6 +222,20 @@ public class Path<V> return -1; } + byte findPredecessorParentDepth() + { + byte depth = this.depth; + depth--; + while (depth >= 0) + { + int ub = indexes[depth] - 1; + if (ub >= 0) + return depth; + depth--; + } + return -1; + } + // move to the next key in the tree void successor() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java index 9f45031..93c02ae 100644 --- a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java +++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java @@ -22,17 +22,15 @@ import com.google.common.base.Function; /** * An interface defining a function to be applied to both the object we are replacing in a BTree and * the object that is intended to replace it, returning the object to actually replace it. - * - * @param <V> */ -public interface UpdateFunction<V> extends Function<V, V> +public interface UpdateFunction<K, V> extends Function<K, V> { /** * @param replacing the value in the original tree we have matched * @param update the value in the updating collection that matched * @return the value to insert into the new tree */ - V apply(V replacing, V update); + V apply(V replacing, K update); /** * @return true if we should fail the update @@ -44,37 +42,4 @@ public interface UpdateFunction<V> extends Function<V, V> */ void allocated(long heapSize); - public static final class NoOp<V> implements UpdateFunction<V> - { - - private static final NoOp INSTANCE = new NoOp(); - public static <V> NoOp<V> instance() - { - return INSTANCE; - } - - private NoOp() - { - } - - public V apply(V replacing, V update) - { - return update; - } - - public V apply(V update) - { - return update; - } - - public boolean abortEarly() - { - return false; - } - - public void allocated(long heapSize) - { - } - } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java index baecb34..e80faca 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java @@ -100,6 +100,11 @@ public class Accumulator<E> implements Iterable<E> return presentCount; } + public int capacity() + { + return values.length; + } + public Iterator<E> iterator() { return new Iterator<E>()
