Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbefa854 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbefa854 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbefa854 Branch: refs/heads/cassandra-3.0 Commit: dbefa854b403824334922dd35dc5e18ff1be51a9 Parents: c2566d1 615bf37 Author: Carl Yeksigian <c...@apache.org> Authored: Wed Jun 15 10:39:09 2016 -0400 Committer: Carl Yeksigian <c...@apache.org> Committed: Wed Jun 15 10:39:09 2016 -0400 ---------------------------------------------------------------------- .../cql3/selection/SelectionColumnMapping.java | 20 ++++++++++++++++++ .../cql3/selection/SelectionColumns.java | 20 ++++++++++++++++++ .../db/lifecycle/SSTableIntervalTree.java | 22 +++++++++++++++++++- .../cassandra/hadoop/pig/StorageHelper.java | 20 ++++++++++++++++++ .../cassandra/locator/PendingRangeMaps.java | 20 ++++++++++++++++++ .../cassandra/repair/RepairParallelism.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/CRC32Factory.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/OverlapIterator.java | 22 +++++++++++++++++++- .../utils/RMIServerSocketFactoryImpl.java | 20 ++++++++++++++++++ .../org/apache/cassandra/utils/SyncUtil.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/concurrent/Ref.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/concurrent/Refs.java | 20 ++++++++++++++++++ .../io/compress/CompressorPerformance.java | 20 ++++++++++++++++++ .../test/microbench/PendingRangesBench.java | 20 ++++++++++++++++++ .../selection/SelectionColumnMappingTest.java | 20 ++++++++++++++++++ .../validation/operations/SelectLimitTest.java | 20 ++++++++++++++++++ .../SelectOrderedPartitionerTest.java | 20 ++++++++++++++++++ .../gms/ArrayBackedBoundedStatsTest.java | 20 ++++++++++++++++++ .../cassandra/io/RandomAccessReaderTest.java | 20 ++++++++++++++++++ .../io/util/BufferedDataOutputStreamTest.java | 20 ++++++++++++++++++ .../io/util/NIODataInputStreamTest.java | 20 ++++++++++++++++++ .../cassandra/locator/PendingRangeMapsTest.java | 20 ++++++++++++++++++ .../cassandra/net/MessagingServiceTest.java | 20 ++++++++++++++++++ .../apache/cassandra/utils/TopKSamplerTest.java | 20 ++++++++++++++++++ 24 files changed, 482 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java index 33ef0af,0000000..8636f19 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java +++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumnMapping.java @@@ -1,132 -1,0 +1,152 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.cql3.selection; + +import java.util.*; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.collect.*; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnSpecification; + +/** + * Separately maintains the ColumnSpecifications and their mappings to underlying + * columns as we may receive null mappings. This occurs where a query result + * includes a column specification which does not map to any particular real + * column, e.g. COUNT queries or where no-arg functions like now() are used + */ +public class SelectionColumnMapping implements SelectionColumns +{ + private final ArrayList<ColumnSpecification> columnSpecifications; + private final HashMultimap<ColumnSpecification, ColumnDefinition> columnMappings; + + private SelectionColumnMapping() + { + this.columnSpecifications = new ArrayList<>(); + this.columnMappings = HashMultimap.create(); + } + + protected static SelectionColumnMapping newMapping() + { + return new SelectionColumnMapping(); + } + + protected static SelectionColumnMapping simpleMapping(Iterable<ColumnDefinition> columnDefinitions) + { + SelectionColumnMapping mapping = new SelectionColumnMapping(); + for (ColumnDefinition def: columnDefinitions) + mapping.addMapping(def, def); + return mapping; + } + + protected SelectionColumnMapping addMapping(ColumnSpecification colSpec, ColumnDefinition column) + { + columnSpecifications.add(colSpec); + // functions without arguments do not map to any column, so don't + // record any mapping in that case + if (column != null) + columnMappings.put(colSpec, column); + return this; + } + + protected SelectionColumnMapping addMapping(ColumnSpecification colSpec, Iterable<ColumnDefinition> columns) + { + columnSpecifications.add(colSpec); + columnMappings.putAll(colSpec, columns); + return this; + } + + public List<ColumnSpecification> getColumnSpecifications() + { + // return a mutable copy as we may add extra columns + // for ordering (CASSANDRA-4911 & CASSANDRA-8286) + return Lists.newArrayList(columnSpecifications); + } + + public Multimap<ColumnSpecification, ColumnDefinition> getMappings() + { + return Multimaps.unmodifiableMultimap(columnMappings); + } + + public boolean equals(Object obj) + { + if (obj == null) + return false; + + if (!(obj instanceof SelectionColumnMapping)) + return false; + + SelectionColumns other = (SelectionColumns)obj; + return Objects.equal(columnMappings, other.getMappings()) + && Objects.equal(columnSpecifications, other.getColumnSpecifications()); + } + + public int hashCode() + { + return Objects.hashCode(columnMappings); + } + + public String toString() + { + final Function<ColumnDefinition, String> getDefName = new Function<ColumnDefinition, String>() + { + public String apply(ColumnDefinition def) + { + return def.name.toString(); + } + }; + Function<Map.Entry<ColumnSpecification, Collection<ColumnDefinition>>, String> mappingEntryToString = + new Function<Map.Entry<ColumnSpecification, Collection<ColumnDefinition>>, String>(){ + public String apply(Map.Entry<ColumnSpecification, Collection<ColumnDefinition>> entry) + { + StringBuilder builder = new StringBuilder(); + builder.append(entry.getKey().name.toString()); + builder.append(":["); + builder.append(Joiner.on(',').join(Iterables.transform(entry.getValue(), getDefName))); + builder.append("]"); + return builder.toString(); + } + }; + + Function<ColumnSpecification, String> colSpecToString = new Function<ColumnSpecification, String>() + { + public String apply(ColumnSpecification columnSpecification) + { + return columnSpecification.name.toString(); + } + }; + + StringBuilder builder = new StringBuilder(); + builder.append("{ Columns:["); + builder.append(Joiner.on(",") + .join(Iterables.transform(columnSpecifications, colSpecToString))); + builder.append("], Mappings:["); + builder.append(Joiner.on(", ") + .join(Iterables.transform(columnMappings.asMap().entrySet(), + mappingEntryToString))); + builder.append("] }"); + return builder.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java index af334e6,0000000..151a2f3 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java +++ b/src/java/org/apache/cassandra/cql3/selection/SelectionColumns.java @@@ -1,18 -1,0 +1,38 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.cql3.selection; + +import java.util.List; + +import com.google.common.collect.Multimap; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnSpecification; + +/** + * Represents a mapping between the actual columns used to satisfy a Selection + * and the column definitions included in the resultset metadata for the query. + */ +public interface SelectionColumns +{ + List<ColumnSpecification> getColumnSpecifications(); + Multimap<ColumnSpecification, ColumnDefinition> getMappings(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java index ff2abcb,0000000..841fa92 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java +++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java @@@ -1,40 -1,0 +1,60 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.db.lifecycle; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.utils.Interval; +import org.apache.cassandra.utils.IntervalTree; + +public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>> +{ + private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null); + + SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals) + { + super(intervals); + } + + public static SSTableIntervalTree empty() + { + return EMPTY; + } + + public static SSTableIntervalTree build(Iterable<SSTableReader> sstables) + { + return new SSTableIntervalTree(buildIntervals(sstables)); + } + + public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables) + { + List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables)); + for (SSTableReader sstable : sstables) + intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable)); + return intervals; + } - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java index d700cb7,0000000..74f734e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java +++ b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java @@@ -1,122 -1,0 +1,142 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.hadoop.pig; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.Date; +import java.util.UUID; + +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.hadoop.ConfigHelper; +import org.apache.cassandra.serializers.CollectionSerializer; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +@Deprecated +public class StorageHelper +{ + // system environment variables that can be set to configure connection info: + // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper + public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT"; + public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS"; + public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER"; + public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT"; + public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS"; + public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER"; + public final static String PIG_RPC_PORT = "PIG_RPC_PORT"; + public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; + public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; + public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT"; + public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT"; + public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE"; + + + public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter"; + + protected static void setConnectionInformation(Configuration conf) + { + if (System.getenv(PIG_RPC_PORT) != null) + { + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT)); + } + + if (System.getenv(PIG_INPUT_RPC_PORT) != null) + ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT)); + if (System.getenv(PIG_OUTPUT_RPC_PORT) != null) + ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT)); + + if (System.getenv(PIG_INITIAL_ADDRESS) != null) + { + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS)); + } + if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS)); + if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null) + ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS)); + + if (System.getenv(PIG_PARTITIONER) != null) + { + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER)); + } + if(System.getenv(PIG_INPUT_PARTITIONER) != null) + ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER)); + if(System.getenv(PIG_OUTPUT_PARTITIONER) != null) + ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER)); + } + + protected static Object cassandraToObj(AbstractType validator, ByteBuffer value, int nativeProtocolVersion) + { + if (validator instanceof DecimalType || validator instanceof InetAddressType) + return validator.getString(value); + + if (validator instanceof CollectionType) + { + // For CollectionType, the compose() method assumes the v3 protocol format of collection, which + // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format + return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion); + } + + return validator.compose(value); + } + + /** set the value to the position of the tuple */ + protected static void setTupleValue(Tuple pair, int position, Object value) throws ExecException + { + if (value instanceof BigInteger) + pair.set(position, ((BigInteger) value).intValue()); + else if (value instanceof ByteBuffer) + pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value))); + else if (value instanceof UUID) + pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value))); + else if (value instanceof Date) + pair.set(position, TimestampType.instance.decompose((Date) value).getLong()); + else + pair.set(position, value); + } + + /** get pig type for the cassandra data type*/ + protected static byte getPigType(AbstractType type) + { + if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad + return DataType.LONG; + else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger + return DataType.INTEGER; + else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType) + return DataType.CHARARRAY; + else if (type instanceof FloatType) + return DataType.FLOAT; + else if (type instanceof DoubleType) + return DataType.DOUBLE; + else if (type instanceof AbstractCompositeType || type instanceof CollectionType) + return DataType.TUPLE; + + return DataType.BYTEARRAY; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/locator/PendingRangeMaps.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/locator/PendingRangeMaps.java index 1892cc3,0000000..cfeccc4 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java +++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java @@@ -1,209 -1,0 +1,229 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.locator; + +import com.google.common.collect.Iterators; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.*; + +public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddress>>> +{ + private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class); + + /** + * We have for NavigableMap to be able to search for ranges containing a token efficiently. + * + * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges. + */ + // ascendingMap will sort the ranges by the ascending order of right token + final NavigableMap<Range<Token>, List<InetAddress>> ascendingMap; + /** + * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will + * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap. + */ + static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o1.right.compareTo(o2.right); + if (res != 0) + return res; + + return o2.left.compareTo(o1.left); + } + }; + + // ascendingMap will sort the ranges by the descending order of left token + final NavigableMap<Range<Token>, List<InetAddress>> descendingMap; + /** + * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will + * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap. + */ + static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o2.left.compareTo(o1.left); + if (res != 0) + return res; + + // if left tokens are same, sort by the descending of the right tokens. + return o2.right.compareTo(o1.right); + } + }; + + // these two maps are for warp around ranges. + final NavigableMap<Range<Token>, List<InetAddress>> ascendingMapForWrapAround; + /** + * for wrap around range (begin, end], which begin > end. + * Sorting end ascending, if ends are same, sorting begin ascending, + * so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in + * the tailMap. + */ + static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o1.right.compareTo(o2.right); + if (res != 0) + return res; + + return o1.left.compareTo(o2.left); + } + }; + + final NavigableMap<Range<Token>, List<InetAddress>> descendingMapForWrapAround; + /** + * for wrap around ranges, which begin > end. + * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin, + * and (begin, end) won't be selected in the tailMap. + */ + static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>() + { + @Override + public int compare(Range<Token> o1, Range<Token> o2) + { + int res = o2.left.compareTo(o1.left); + if (res != 0) + return res; + return o1.right.compareTo(o2.right); + } + }; + + public PendingRangeMaps() + { + this.ascendingMap = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparator); + this.descendingMap = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparator); + this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(ascendingComparatorForWrapAround); + this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddress>>(descendingComparatorForWrapAround); + } + + static final void addToMap(Range<Token> range, + InetAddress address, + NavigableMap<Range<Token>, List<InetAddress>> ascendingMap, + NavigableMap<Range<Token>, List<InetAddress>> descendingMap) + { + List<InetAddress> addresses = ascendingMap.get(range); + if (addresses == null) + { + addresses = new ArrayList<InetAddress>(1); + ascendingMap.put(range, addresses); + descendingMap.put(range, addresses); + } + addresses.add(address); + } + + public void addPendingRange(Range<Token> range, InetAddress address) + { + if (Range.isWrapAround(range.left, range.right)) + { + addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround); + } + else + { + addToMap(range, address, ascendingMap, descendingMap); + } + } + + static final void addIntersections(Set<InetAddress> endpointsToAdd, + NavigableMap<Range<Token>, List<InetAddress>> smallerMap, + NavigableMap<Range<Token>, List<InetAddress>> biggerMap) + { + // find the intersection of two sets + for (Range<Token> range : smallerMap.keySet()) + { + List<InetAddress> addresses = biggerMap.get(range); + if (addresses != null) + { + endpointsToAdd.addAll(addresses); + } + } + } + + public Collection<InetAddress> pendingEndpointsFor(Token token) + { + Set<InetAddress> endpoints = new HashSet<>(); + + Range searchRange = new Range(token, token); + + // search for non-wrap-around maps + NavigableMap<Range<Token>, List<InetAddress>> ascendingTailMap = ascendingMap.tailMap(searchRange, true); + NavigableMap<Range<Token>, List<InetAddress>> descendingTailMap = descendingMap.tailMap(searchRange, false); + + // add intersections of two maps + if (ascendingTailMap.size() < descendingTailMap.size()) + { + addIntersections(endpoints, ascendingTailMap, descendingTailMap); + } + else + { + addIntersections(endpoints, descendingTailMap, ascendingTailMap); + } + + // search for wrap-around sets + ascendingTailMap = ascendingMapForWrapAround.tailMap(searchRange, true); + descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false); + + // add them since they are all necessary. + for (Map.Entry<Range<Token>, List<InetAddress>> entry : ascendingTailMap.entrySet()) + { + endpoints.addAll(entry.getValue()); + } + for (Map.Entry<Range<Token>, List<InetAddress>> entry : descendingTailMap.entrySet()) + { + endpoints.addAll(entry.getValue()); + } + + return endpoints; + } + + public String printPendingRanges() + { + StringBuilder sb = new StringBuilder(); + + for (Map.Entry<Range<Token>, List<InetAddress>> entry : this) + { + Range<Token> range = entry.getKey(); + + for (InetAddress address : entry.getValue()) + { + sb.append(address).append(':').append(range); + sb.append(System.getProperty("line.separator")); + } + } + + return sb.toString(); + } + + @Override + public Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator() + { + return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/repair/RepairParallelism.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/utils/CRC32Factory.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/CRC32Factory.java index bb700eb,0000000..a031f09 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/utils/CRC32Factory.java +++ b/src/java/org/apache/cassandra/utils/CRC32Factory.java @@@ -1,15 -1,0 +1,35 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++*/ +package org.apache.cassandra.utils; + + +/** + * CRC Factory that uses our pure java crc for default + */ +public class CRC32Factory extends com.github.tjake.CRC32Factory +{ + public static final CRC32Factory instance = new CRC32Factory(); + + public CRC32Factory() + { + super(PureJavaCrc32.class); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/utils/OverlapIterator.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/OverlapIterator.java index 131a749,0000000..7c1544a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/utils/OverlapIterator.java +++ b/src/java/org/apache/cassandra/utils/OverlapIterator.java @@@ -1,54 -1,0 +1,74 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.utils; + +import java.util.*; + +/** + * A class for iterating sequentially through an ordered collection and efficiently + * finding the overlapping set of matching intervals. + * + * The algorithm is quite simple: the intervals are sorted ascending by both min and max + * in two separate lists. These lists are walked forwards each time we visit a new point, + * with the set of intervals in the min-ordered list being added to our set of overlaps, + * and those in the max-ordered list being removed. + */ +public class OverlapIterator<I extends Comparable<? super I>, V> +{ + // indexing into sortedByMin, tracks the next interval to include + int nextToInclude; + final List<Interval<I, V>> sortedByMin; + // indexing into sortedByMax, tracks the next interval to exclude + int nextToExclude; + final List<Interval<I, V>> sortedByMax; + final Set<V> overlaps = new HashSet<>(); + final Set<V> accessible = Collections.unmodifiableSet(overlaps); + + public OverlapIterator(Collection<Interval<I, V>> intervals) + { + sortedByMax = new ArrayList<>(intervals); + Collections.sort(sortedByMax, Interval.<I, V>maxOrdering()); + // we clone after first sorting by max; this is quite likely to make sort cheaper, since a.max < b.max + // generally increases the likelihood that a.min < b.min, so the list may be partially sorted already. + // this also means if (in future) we sort either collection (or a subset thereof) by the other's comparator + // all items, including equal, will occur in the same order, including + sortedByMin = new ArrayList<>(sortedByMax); + Collections.sort(sortedByMin, Interval.<I, V>minOrdering()); + } + + // move the iterator forwards to the overlaps matching point + public void update(I point) + { + // we don't use binary search here since we expect points to be a superset of the min/max values + + // add those we are now after the start of + while (nextToInclude < sortedByMin.size() && sortedByMin.get(nextToInclude).min.compareTo(point) <= 0) + overlaps.add(sortedByMin.get(nextToInclude++).data); + // remove those we are now after the end of + while (nextToExclude < sortedByMax.size() && sortedByMax.get(nextToExclude).max.compareTo(point) < 0) + overlaps.remove(sortedByMax.get(nextToExclude++).data); + } + + public Set<V> overlaps() + { + return accessible; + } - } ++} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/utils/SyncUtil.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/SyncUtil.java index 0e83ba2,0000000..0d293aa mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/utils/SyncUtil.java +++ b/src/java/org/apache/cassandra/utils/SyncUtil.java @@@ -1,165 -1,0 +1,185 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.utils; + +import java.io.FileDescriptor; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.SyncFailedException; +import java.lang.reflect.Field; +import java.nio.MappedByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.cassandra.config.Config; + +import com.google.common.base.Preconditions; + +/* + * A wrapper around various mechanisms for syncing files that makes it possible it intercept + * and skip syncing. Useful for unit tests in certain environments where syncs can have outliers + * bad enough to causes tests to run 10s of seconds longer. + */ +public class SyncUtil +{ + public static boolean SKIP_SYNC = Boolean.getBoolean(Config.PROPERTY_PREFIX + "skip_sync"); + + private static final Field mbbFDField; + private static final Field fdClosedField; + private static final Field fdUseCountField; + + static + { + Field mbbFDFieldTemp = null; + try + { + mbbFDFieldTemp = MappedByteBuffer.class.getDeclaredField("fd"); + mbbFDFieldTemp.setAccessible(true); + } + catch (NoSuchFieldException e) + { + } + mbbFDField = mbbFDFieldTemp; + + //Java 8 + Field fdClosedFieldTemp = null; + try + { + fdClosedFieldTemp = FileDescriptor.class.getDeclaredField("closed"); + fdClosedFieldTemp.setAccessible(true); + } + catch (NoSuchFieldException e) + { + } + fdClosedField = fdClosedFieldTemp; + + //Java 7 + Field fdUseCountTemp = null; + try + { + fdUseCountTemp = FileDescriptor.class.getDeclaredField("useCount"); + fdUseCountTemp.setAccessible(true); + } + catch (NoSuchFieldException e) + { + } + fdUseCountField = fdUseCountTemp; + } + + public static MappedByteBuffer force(MappedByteBuffer buf) + { + Preconditions.checkNotNull(buf); + if (SKIP_SYNC) + { + Object fd = null; + try + { + if (mbbFDField != null) + { + fd = mbbFDField.get(buf); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + //This is what MappedByteBuffer.force() throws if a you call force() on an umapped buffer + if (mbbFDField != null && fd == null) + throw new UnsupportedOperationException(); + return buf; + } + else + { + return buf.force(); + } + } + + public static void sync(FileDescriptor fd) throws SyncFailedException + { + Preconditions.checkNotNull(fd); + if (SKIP_SYNC) + { + boolean closed = false; + try + { + if (fdClosedField != null) + closed = fdClosedField.getBoolean(fd); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + int useCount = 1; + try + { + if (fdUseCountField != null) + useCount = ((AtomicInteger)fdUseCountField.get(fd)).get(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + if (closed || !fd.valid() || useCount < 0) + throw new SyncFailedException("Closed " + closed + " valid " + fd.valid() + " useCount " + useCount); + } + else + { + fd.sync(); + } + } + + public static void force(FileChannel fc, boolean metaData) throws IOException + { + Preconditions.checkNotNull(fc); + if (SKIP_SYNC) + { + if (!fc.isOpen()) + throw new ClosedChannelException(); + } + else + { + fc.force(metaData); + } + } + + public static void sync(RandomAccessFile ras) throws IOException + { + Preconditions.checkNotNull(ras); + sync(ras.getFD()); + } + + public static void sync(FileOutputStream fos) throws IOException + { + Preconditions.checkNotNull(fos); + sync(fos.getFD()); + } + + public static void trySync(int fd) + { + if (SKIP_SYNC) + return; + else + CLibrary.trySync(fd); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/utils/concurrent/Ref.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/src/java/org/apache/cassandra/utils/concurrent/Refs.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java ---------------------------------------------------------------------- diff --cc test/long/org/apache/cassandra/io/compress/CompressorPerformance.java index 7401951,0000000..3612412 mode 100644,000000..100644 --- a/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java +++ b/test/long/org/apache/cassandra/io/compress/CompressorPerformance.java @@@ -1,99 -1,0 +1,119 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.io.compress; + +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; + +public class CompressorPerformance +{ + + static public void testPerformances() throws IOException + { + for (ICompressor compressor: new ICompressor[] { + SnappyCompressor.instance, // warm up + DeflateCompressor.instance, + LZ4Compressor.instance, + SnappyCompressor.instance + }) + { + for (BufferType in: BufferType.values()) + { + if (compressor.supports(in)) + { + for (BufferType out: BufferType.values()) + { + if (compressor.supports(out)) + { + for (int i=0; i<10; ++i) + testPerformance(compressor, in, out); + System.out.println(); + } + } + } + } + } + } + + static ByteBuffer dataSource; + static int bufLen; + + static private void testPerformance(ICompressor compressor, BufferType in, BufferType out) throws IOException + { + int len = dataSource.capacity(); + int bufLen = compressor.initialCompressedBufferLength(len); + ByteBuffer input = in.allocate(bufLen); + ByteBuffer output = out.allocate(bufLen); + + int checksum = 0; + int count = 100; + + long time = System.nanoTime(); + for (int i=0; i<count; ++i) + { + output.clear(); + compressor.compress(dataSource, output); + // Make sure not optimized away. + checksum += output.get(ThreadLocalRandom.current().nextInt(output.position())); + dataSource.rewind(); + } + long timec = System.nanoTime() - time; + output.flip(); + input.put(output); + input.flip(); + + time = System.nanoTime(); + for (int i=0; i<count; ++i) + { + output.clear(); + compressor.uncompress(input, output); + // Make sure not optimized away. + checksum += output.get(ThreadLocalRandom.current().nextInt(output.position())); + input.rewind(); + } + long timed = System.nanoTime() - time; + System.out.format("Compressor %s %s->%s compress %.3f ns/b %.3f mb/s uncompress %.3f ns/b %.3f mb/s.%s\n", + compressor.getClass().getSimpleName(), + in, + out, + 1.0 * timec / (count * len), + Math.scalb(1.0e9, -20) * count * len / timec, + 1.0 * timed / (count * len), + Math.scalb(1.0e9, -20) * count * len / timed, + checksum == 0 ? " " : ""); + } + + public static void main(String[] args) throws IOException + { + try (FileInputStream fis = new FileInputStream("CHANGES.txt")) + { + int len = (int)fis.getChannel().size(); + dataSource = ByteBuffer.allocateDirect(len); + while (dataSource.hasRemaining()) { + fis.getChannel().read(dataSource); + } + dataSource.flip(); + } + testPerformances(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java ---------------------------------------------------------------------- diff --cc test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java index e50cbaf,0000000..9ec1aa6 mode 100644,000000..100644 --- a/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/PendingRangesBench.java @@@ -1,89 -1,0 +1,109 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.test.microbench; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.PendingRangeMaps; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 50, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(value = 3,jvmArgsAppend = "-Xmx512M") +@Threads(1) +@State(Scope.Benchmark) +public class PendingRangesBench +{ + PendingRangeMaps pendingRangeMaps; + int maxToken = 256 * 100; + + Multimap<Range<Token>, InetAddress> oldPendingRanges; + + private Range<Token> genRange(String left, String right) + { + return new Range<Token>(new RandomPartitioner.BigIntegerToken(left), new RandomPartitioner.BigIntegerToken(right)); + } + + @Setup + public void setUp() throws UnknownHostException + { + pendingRangeMaps = new PendingRangeMaps(); + oldPendingRanges = HashMultimap.create(); + + InetAddress[] addresses = {InetAddress.getByName("127.0.0.1"), InetAddress.getByName("127.0.0.2")}; + + for (int i = 0; i < maxToken; i++) + { + for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++) + { + Range<Token> range = genRange(Integer.toString(i * 10 + 5), Integer.toString(i * 10 + 15)); + pendingRangeMaps.addPendingRange(range, addresses[j]); + oldPendingRanges.put(range, addresses[j]); + } + } + + // add the wrap around range + for (int j = 0; j < ThreadLocalRandom.current().nextInt(2); j ++) + { + Range<Token> range = genRange(Integer.toString(maxToken * 10 + 5), Integer.toString(5)); + pendingRangeMaps.addPendingRange(range, addresses[j]); + oldPendingRanges.put(range, addresses[j]); + } + } + + @Benchmark + public void searchToken(final Blackhole bh) + { + int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5); + Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken)); + bh.consume(pendingRangeMaps.pendingEndpointsFor(searchToken)); + } + + @Benchmark + public void searchTokenForOldPendingRanges(final Blackhole bh) + { + int randomToken = ThreadLocalRandom.current().nextInt(maxToken * 10 + 5); + Token searchToken = new RandomPartitioner.BigIntegerToken(Integer.toString(randomToken)); + Set<InetAddress> endpoints = new HashSet<>(); + for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : oldPendingRanges.asMap().entrySet()) + { + if (entry.getKey().contains(searchToken)) + endpoints.addAll(entry.getValue()); + } + bh.consume(endpoints); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java index 7aaf9c9,0000000..8757b19 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java +++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java @@@ -1,510 -1,0 +1,530 @@@ ++/* ++ * ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ * ++ */ +package org.apache.cassandra.cql3.selection; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.*; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SelectionColumnMappingTest extends CQLTester +{ + private static final ColumnDefinition NULL_DEF = null; + String tableName; + String typeName; + UserType userType; + String functionName; + + @BeforeClass + public static void setUpClass() + { + DatabaseDescriptor.setPartitioner(ByteOrderedPartitioner.instance); + } + + @Test + public void testSelectionColumnMapping() throws Throwable + { + // Organised as a single test to avoid the overhead of + // table creation for each variant + + typeName = createType("CREATE TYPE %s (f1 int, f2 text)"); + tableName = createTable("CREATE TABLE %s (" + + " k int PRIMARY KEY," + + " v1 int," + + " v2 ascii," + + " v3 frozen<" + typeName + ">)"); + userType = Schema.instance.getKSMetaData(KEYSPACE).userTypes.getType(ByteBufferUtil.bytes(typeName)); + functionName = createFunction(KEYSPACE, "int, ascii", + "CREATE FUNCTION %s (i int, a ascii) " + + "CALLED ON NULL INPUT " + + "RETURNS int " + + "LANGUAGE java " + + "AS 'return Integer.valueOf(i);'"); + execute("INSERT INTO %s (k, v1 ,v2, v3) VALUES (1, 1, 'foo', {f1:1, f2:'bar'})"); + + testSimpleTypes(); + testWildcard(); + testSimpleTypesWithAliases(); + testUserTypes(); + testUserTypesWithAliases(); + testWritetimeAndTTL(); + testWritetimeAndTTLWithAliases(); + testFunction(); + testNoArgFunction(); + testUserDefinedFunction(); + testOverloadedFunction(); + testFunctionWithAlias(); + testNoArgumentFunction(); + testNestedFunctions(); + testNestedFunctionsWithArguments(); + testCount(); + testDuplicateFunctionsWithoutAliases(); + testDuplicateFunctionsWithAliases(); + testSelectDistinct(); + testMultipleAliasesOnSameColumn(); + testMixedColumnTypes(); + testMultipleUnaliasedSelectionOfSameColumn(); + testUserDefinedAggregate(); + } + + @Test + public void testMultipleArgumentFunction() throws Throwable + { + // demonstrate behaviour of token() with composite partition key + tableName = createTable("CREATE TABLE %s (a int, b text, PRIMARY KEY ((a, b)))"); + ColumnSpecification tokenSpec = columnSpecification("system.token(a, b)", BytesType.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(tokenSpec, columnDefinitions("a", "b")); + // we don't use verify like with the other tests because this query will produce no results + SelectStatement statement = getSelect("SELECT token(a,b) FROM %s"); + verifyColumnMapping(expected, statement); + statement.executeInternal(QueryState.forInternalCalls(), QueryOptions.DEFAULT); + } + + private void testSimpleTypes() throws Throwable + { + // simple column identifiers without aliases are represented in + // ResultSet.Metadata by the underlying ColumnDefinition + ColumnSpecification kSpec = columnSpecification("k", Int32Type.instance); + ColumnSpecification v1Spec = columnSpecification("v1", Int32Type.instance); + ColumnSpecification v2Spec = columnSpecification("v2", AsciiType.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(kSpec, columnDefinition("k")) + .addMapping(v1Spec, columnDefinition("v1")) + .addMapping(v2Spec, columnDefinition("v2")); + + verify(expected, "SELECT k, v1, v2 FROM %s"); + } + + private void testWildcard() throws Throwable + { + // Wildcard select represents each column in the table with a ColumnDefinition + // in the ResultSet metadata + ColumnDefinition kSpec = columnDefinition("k"); + ColumnDefinition v1Spec = columnDefinition("v1"); + ColumnDefinition v2Spec = columnDefinition("v2"); + ColumnDefinition v3Spec = columnDefinition("v3"); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(kSpec, columnDefinition("k")) + .addMapping(v1Spec, columnDefinition("v1")) + .addMapping(v2Spec, columnDefinition("v2")) + .addMapping(v3Spec, columnDefinition("v3")); + + verify(expected, "SELECT * FROM %s"); + } + + private void testSimpleTypesWithAliases() throws Throwable + { + // simple column identifiers with aliases are represented in ResultSet.Metadata + // by a ColumnSpecification based on the underlying ColumnDefinition + ColumnSpecification kSpec = columnSpecification("k_alias", Int32Type.instance); + ColumnSpecification v1Spec = columnSpecification("v1_alias", Int32Type.instance); + ColumnSpecification v2Spec = columnSpecification("v2_alias", AsciiType.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(kSpec, columnDefinition("k")) + .addMapping(v1Spec, columnDefinition("v1")) + .addMapping(v2Spec, columnDefinition("v2")); + + verify(expected, "SELECT k AS k_alias, v1 AS v1_alias, v2 AS v2_alias FROM %s"); + } + + private void testUserTypes() throws Throwable + { + // User type fields are represented in ResultSet.Metadata by a + // ColumnSpecification denoting the name and type of the particular field + ColumnSpecification f1Spec = columnSpecification("v3.f1", Int32Type.instance); + ColumnSpecification f2Spec = columnSpecification("v3.f2", UTF8Type.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(f1Spec, columnDefinition("v3")) + .addMapping(f2Spec, columnDefinition("v3")); + + verify(expected, "SELECT v3.f1, v3.f2 FROM %s"); + } + + private void testUserTypesWithAliases() throws Throwable + { + // User type fields with aliases are represented in ResultSet.Metadata + // by a ColumnSpecification with the alias name and the type of the actual field + ColumnSpecification f1Spec = columnSpecification("f1_alias", Int32Type.instance); + ColumnSpecification f2Spec = columnSpecification("f2_alias", UTF8Type.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(f1Spec, columnDefinition("v3")) + .addMapping(f2Spec, columnDefinition("v3")); + + verify(expected, "SELECT v3.f1 AS f1_alias, v3.f2 AS f2_alias FROM %s"); + } + + private void testWritetimeAndTTL() throws Throwable + { + // writetime and ttl are represented in ResultSet.Metadata by a ColumnSpecification + // with the function name plus argument and a long or int type respectively + ColumnSpecification wtSpec = columnSpecification("writetime(v1)", LongType.instance); + ColumnSpecification ttlSpec = columnSpecification("ttl(v2)", Int32Type.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(wtSpec, columnDefinition("v1")) + .addMapping(ttlSpec, columnDefinition("v2")); + + verify(expected, "SELECT writetime(v1), ttl(v2) FROM %s"); + } + + private void testWritetimeAndTTLWithAliases() throws Throwable + { + // writetime and ttl with aliases are represented in ResultSet.Metadata + // by a ColumnSpecification with the alias name and the appropriate numeric type + ColumnSpecification wtSpec = columnSpecification("wt_alias", LongType.instance); + ColumnSpecification ttlSpec = columnSpecification("ttl_alias", Int32Type.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(wtSpec, columnDefinition("v1")) + .addMapping(ttlSpec, columnDefinition("v2")); + + verify(expected, "SELECT writetime(v1) AS wt_alias, ttl(v2) AS ttl_alias FROM %s"); + } + + private void testFunction() throws Throwable + { + // a function such as intasblob(<col>) is represented in ResultSet.Metadata + // by a ColumnSpecification with the function name plus args and the type set + // to the function's return type + ColumnSpecification fnSpec = columnSpecification("system.intasblob(v1)", BytesType.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(fnSpec, columnDefinition("v1")); + + verify(expected, "SELECT intasblob(v1) FROM %s"); + } + + private void testNoArgFunction() throws Throwable + { + // a no-arg function such as now() is represented in ResultSet.Metadata + // but has no mapping to any underlying column + ColumnSpecification fnSpec = columnSpecification("system.now()", TimeUUIDType.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping().addMapping(fnSpec, NULL_DEF); + + verify(expected, "SELECT now() FROM %s"); + } + + private void testOverloadedFunction() throws Throwable + { + String fnName = createFunction(KEYSPACE, "int", + "CREATE FUNCTION %s (input int) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS text " + + "LANGUAGE java " + + "AS 'return \"Hello World\";'"); + createFunctionOverload(fnName, "text", + "CREATE FUNCTION %s (input text) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS text " + + "LANGUAGE java " + + "AS 'return \"Hello World\";'"); + + createFunctionOverload(fnName, "int, text", + "CREATE FUNCTION %s (input1 int, input2 text) " + + "RETURNS NULL ON NULL INPUT " + + "RETURNS text " + + "LANGUAGE java " + + "AS 'return \"Hello World\";'"); + ColumnSpecification fnSpec1 = columnSpecification(fnName + "(v1)", UTF8Type.instance); + ColumnSpecification fnSpec2 = columnSpecification(fnName + "(v2)", UTF8Type.instance); + ColumnSpecification fnSpec3 = columnSpecification(fnName + "(v1, v2)", UTF8Type.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(fnSpec1, columnDefinition("v1")) + .addMapping(fnSpec2, columnDefinition("v2")) + .addMapping(fnSpec3, columnDefinitions("v1", "v2")); + + verify(expected, String.format("SELECT %1$s(v1), %1$s(v2), %1$s(v1, v2) FROM %%s", fnName)); + } + + private void testCount() throws Throwable + { + // SELECT COUNT does not necessarily include any mappings, but it must always return + // a singleton list from getColumnSpecifications() in order for the ResultSet.Metadata + // to be constructed correctly: + // * COUNT(*) / COUNT(1) do not generate any mappings, as no specific columns are referenced + // * COUNT(foo) does generate a mapping from the 'system.count' column spec to foo + ColumnSpecification count = columnSpecification("count", LongType.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping().addMapping(count, NULL_DEF); + + verify(expected, "SELECT COUNT(*) FROM %s"); + verify(expected, "SELECT COUNT(1) FROM %s"); + + ColumnSpecification aliased = columnSpecification("count_alias", LongType.instance); + expected = SelectionColumnMapping.newMapping().addMapping(aliased, NULL_DEF); + + verify(expected, "SELECT COUNT(*) AS count_alias FROM %s"); + verify(expected, "SELECT COUNT(1) AS count_alias FROM %s"); + + ColumnSpecification countV1 = columnSpecification("system.count(v1)", LongType.instance); + expected = SelectionColumnMapping.newMapping().addMapping(countV1, columnDefinition("v1")); + verify(expected, "SELECT COUNT(v1) FROM %s"); + + ColumnSpecification countV1Alias = columnSpecification("count_v1", LongType.instance); + expected = SelectionColumnMapping.newMapping().addMapping(countV1Alias, columnDefinition("v1")); + verify(expected, "SELECT COUNT(v1) AS count_v1 FROM %s"); + } + + private void testUserDefinedFunction() throws Throwable + { + // UDFs are basically represented in the same way as system functions + String functionCall = String.format("%s(v1, v2)", functionName); + ColumnSpecification fnSpec = columnSpecification(functionCall, Int32Type.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(fnSpec, columnDefinitions("v1", "v2")); + verify(expected, "SELECT " + functionCall + " FROM %s"); + } + + private void testFunctionWithAlias() throws Throwable + { + // a function with an alias is represented in ResultSet.Metadata by a + // ColumnSpecification with the alias and the type set to the function's + // return type + ColumnSpecification fnSpec = columnSpecification("fn_alias", BytesType.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(fnSpec, columnDefinition("v1")); + + verify(expected, "SELECT intasblob(v1) AS fn_alias FROM %s"); + } + + public void testNoArgumentFunction() throws Throwable + { + SelectionColumns expected = SelectionColumnMapping.newMapping() + .addMapping(columnSpecification("system.now()", + TimeUUIDType.instance), + NULL_DEF); + verify(expected, "SELECT now() FROM %s"); + } + + public void testNestedFunctionsWithArguments() throws Throwable + { + SelectionColumns expected = SelectionColumnMapping.newMapping() + .addMapping(columnSpecification("system.blobasint(system.intasblob(v1))", + Int32Type.instance), + columnDefinition("v1")); + verify(expected, "SELECT blobasint(intasblob(v1)) FROM %s"); + } + + public void testNestedFunctions() throws Throwable + { + SelectionColumns expected = SelectionColumnMapping.newMapping() + .addMapping(columnSpecification("system.tounixtimestamp(system.now())", + LongType.instance), + NULL_DEF); + verify(expected, "SELECT tounixtimestamp(now()) FROM %s"); + } + + public void testDuplicateFunctionsWithoutAliases() throws Throwable + { + // where duplicate functions are present, the ColumnSpecification list will + // contain an entry per-duplicate but the mappings will be deduplicated (i.e. + // a single mapping k/v pair regardless of the number of duplicates) + ColumnSpecification spec = columnSpecification("system.intasblob(v1)", BytesType.instance); + SelectionColumns expected = SelectionColumnMapping.newMapping() + .addMapping(spec, columnDefinition("v1")) + .addMapping(spec, columnDefinition("v1")); + verify(expected, "SELECT intasblob(v1), intasblob(v1) FROM %s"); + } + + public void testDuplicateFunctionsWithAliases() throws Throwable + { + // where duplicate functions are present with distinct aliases, they are + // represented as any other set of distinct columns would be - an entry + // in theColumnSpecification list and a separate k/v mapping for each + SelectionColumns expected = SelectionColumnMapping.newMapping() + .addMapping(columnSpecification("blob_1", BytesType.instance), + columnDefinition("v1")) + .addMapping(columnSpecification("blob_2", BytesType.instance), + columnDefinition("v1")); + verify(expected, "SELECT intasblob(v1) AS blob_1, intasblob(v1) AS blob_2 FROM %s"); + } + + public void testSelectDistinct() throws Throwable + { + SelectionColumns expected = SelectionColumnMapping.newMapping().addMapping(columnSpecification("k", + Int32Type.instance), + columnDefinition("k")); + verify(expected, "SELECT DISTINCT k FROM %s"); + + } + + private void testMultipleAliasesOnSameColumn() throws Throwable + { + // Multiple result columns derived from the same underlying column are + // represented by ColumnSpecifications + ColumnSpecification alias1 = columnSpecification("alias_1", Int32Type.instance); + ColumnSpecification alias2 = columnSpecification("alias_2", Int32Type.instance); + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(alias1, columnDefinition("v1")) + .addMapping(alias2, columnDefinition("v1")); + + verify(expected, "SELECT v1 AS alias_1, v1 AS alias_2 FROM %s"); + } + + private void testMultipleUnaliasedSelectionOfSameColumn() throws Throwable + { + // simple column identifiers without aliases are represented in + // ResultSet.Metadata by the underlying ColumnDefinition + SelectionColumns expected = SelectionColumnMapping.newMapping() + .addMapping(columnSpecification("v1", Int32Type.instance), + columnDefinition("v1")) + .addMapping(columnSpecification("v1", Int32Type.instance), + columnDefinition("v1")); + + verify(expected, "SELECT v1, v1 FROM %s"); + } + + private void testMixedColumnTypes() throws Throwable + { + ColumnSpecification kSpec = columnSpecification("k_alias", Int32Type.instance); + ColumnSpecification v1Spec = columnSpecification("writetime(v1)", LongType.instance); + ColumnSpecification v2Spec = columnSpecification("ttl_alias", Int32Type.instance); + ColumnSpecification f1Spec = columnSpecification("v3.f1", Int32Type.instance); + ColumnSpecification f2Spec = columnSpecification("f2_alias", UTF8Type.instance); + ColumnSpecification f3Spec = columnSpecification("v3", userType); + + SelectionColumnMapping expected = SelectionColumnMapping.newMapping() + .addMapping(kSpec, columnDefinition("k")) + .addMapping(v1Spec, columnDefinition("v1")) + .addMapping(v2Spec, columnDefinition("v2")) + .addMapping(f1Spec, columnDefinition("v3")) + .addMapping(f2Spec, columnDefinition("v3")) + .addMapping(f3Spec, columnDefinition("v3")); + + + verify(expected, "SELECT k AS k_alias," + + " writetime(v1)," + + " ttl(v2) as ttl_alias," + + " v3.f1," + + " v3.f2 AS f2_alias," + + " v3" + + " FROM %s"); + } + + private void testUserDefinedAggregate() throws Throwable + { + String sFunc = parseFunctionName(createFunction(KEYSPACE, "int", + " CREATE FUNCTION %s (a int, b int)" + + " RETURNS NULL ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE javascript" + + " AS 'a + b'")).name; + + String aFunc = createAggregate(KEYSPACE, "int, int", + " CREATE AGGREGATE %s (int)" + + " SFUNC " + sFunc + + " STYPE int" + + " INITCOND 0"); + + String plusOne = createFunction(KEYSPACE, "int", + " CREATE FUNCTION %s (a int)" + + " RETURNS NULL ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE javascript" + + " AS 'a+1'"); + + String sqFunc = createFunction(KEYSPACE, "int", + " CREATE FUNCTION %s (a int)" + + " RETURNS NULL ON NULL INPUT" + + " RETURNS int" + + " LANGUAGE javascript" + + " AS 'a*a'"); + + ColumnDefinition v1 = columnDefinition("v1"); + SelectionColumns expected = SelectionColumnMapping.newMapping() + .addMapping(columnSpecification(aFunc + "(v1)", + Int32Type.instance), + v1); + verify(expected, String.format("SELECT %s(v1) FROM %%s", aFunc)); + + // aggregate with nested udfs as input + String specName = String.format("%s(%s(%s(v1)))", aFunc, sqFunc, plusOne); + expected = SelectionColumnMapping.newMapping().addMapping(columnSpecification(specName, Int32Type.instance), + v1); + verify(expected, String.format("SELECT %s FROM %%s", specName)); + } + + private void verify(SelectionColumns expected, String query) throws Throwable + { + SelectStatement statement = getSelect(query); + verifyColumnMapping(expected, statement); + checkExecution(statement, expected.getColumnSpecifications()); + } + + private void checkExecution(SelectStatement statement, List<ColumnSpecification> expectedResultColumns) + throws RequestExecutionException, RequestValidationException + { + UntypedResultSet rs = UntypedResultSet.create(statement.executeInternal(QueryState.forInternalCalls(), + QueryOptions.DEFAULT).result); + + assertEquals(expectedResultColumns, rs.one().getColumns()); + } + + private SelectStatement getSelect(String query) throws RequestValidationException + { + CQLStatement statement = QueryProcessor.getStatement(String.format(query, KEYSPACE + "." + tableName), + ClientState.forInternalCalls()).statement; + assertTrue(statement instanceof SelectStatement); + return (SelectStatement)statement; + } + + private void verifyColumnMapping(SelectionColumns expected, SelectStatement select) + { + assertEquals(expected, select.getSelection().getColumnMapping()); + } + + private Iterable<ColumnDefinition> columnDefinitions(String...names) + { + List<ColumnDefinition> defs = new ArrayList<>(); + for (String n : names) + defs.add(columnDefinition(n)); + return defs; + } + + private ColumnDefinition columnDefinition(String name) + { + return Schema.instance.getCFMetaData(KEYSPACE, tableName) + .getColumnDefinition(new ColumnIdentifier(name, true)); + + } + + private ColumnSpecification columnSpecification(String name, AbstractType<?> type) + { + return new ColumnSpecification(KEYSPACE, + tableName, + new ColumnIdentifier(name, true), + type); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbefa854/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java index 2ffa15d,e0d1ca2..9609906 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderedPartitionerTest.java @@@ -1,7 -1,25 +1,27 @@@ + /* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.cassandra.cql3.validation.operations; +import java.util.Arrays; + import org.junit.BeforeClass; import org.junit.Test;