http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/SecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java index 9221090..94031ab 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java @@ -42,12 +42,15 @@ import org.apache.cassandra.db.index.keys.KeysIndex; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.LocalByPartionerType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.concurrent.Refs; /** @@ -76,6 +79,10 @@ public abstract class SecondaryIndex */ public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values"; + public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder() + ? BytesType.instance + : new LocalByPartionerType(StorageService.getPartitioner()); + /** * Base CF that has many indexes */ @@ -296,7 +303,7 @@ public abstract class SecondaryIndex */ public DecoratedKey getIndexKeyFor(ByteBuffer value) { - return getIndexCfs().decorateKey(value); + return getIndexCfs().partitioner.decorateKey(value); } /** @@ -374,20 +381,11 @@ public abstract class SecondaryIndex */ public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def) { - return newIndexMetadata(baseMetadata, def, def.type); - } - - /** - * Create the index metadata for the index on a given column of a given table. - */ - static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def, AbstractType<?> comparator) - { if (def.getIndexType() == IndexType.CUSTOM) return null; CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def)) .withId(baseMetadata.cfId) - .withPartitioner(new LocalPartitioner(comparator)) .addPartitionKey(def.name, def.type); if (def.getIndexType() == IndexType.COMPOSITES)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java index 29f235c..42861c5 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java @@ -101,7 +101,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef) { - indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering()); + indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); for (ColumnDefinition def : baseMetadata.clusteringColumns()) indexMetadata.addClusteringColumn(def.name, def.type); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java index cd4aff9..6529ad9 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java @@ -24,6 +24,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.utils.concurrent.OpOrder; /** @@ -47,7 +48,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex { public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef) { - indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering()); + indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); List<ColumnDefinition> cks = baseMetadata.clusteringColumns(); for (int i = 0; i < columnDef.position(); i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index b76bf7e..d322faf 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -111,7 +111,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher // *data* for a given partition. BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator()); List<CompositesIndex.IndexedEntry> entries = new ArrayList<>(); - DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey); + DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey); while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java index 478559a..7930bd6 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexSearcher; @@ -41,7 +42,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex { public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef) { - indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering()); + indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index 53a9b4a..bcaf70b 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -84,7 +84,7 @@ public class KeysSearcher extends SecondaryIndexSearcher while (next == null && indexHits.hasNext()) { Row hit = indexHits.next(); - DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0)); + DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0)); SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(), baseCfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java new file mode 100644 index 0000000..e02ba3c --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.marshal; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.cql3.Term; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.serializers.MarshalException; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** for sorting columns representing row keys in the row ordering as determined by a partitioner. + * Not intended for user-defined CFs, and will in fact error out if used with such. */ +public class LocalByPartionerType extends AbstractType<ByteBuffer> +{ + private final IPartitioner partitioner; + + public LocalByPartionerType(IPartitioner partitioner) + { + this.partitioner = partitioner; + } + + public static LocalByPartionerType getInstance(TypeParser parser) + { + return new LocalByPartionerType(StorageService.getPartitioner()); + } + + @Override + public ByteBuffer compose(ByteBuffer bytes) + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } + + @Override + public ByteBuffer decompose(ByteBuffer bytes) + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } + + public String getString(ByteBuffer bytes) + { + return ByteBufferUtil.bytesToHex(bytes); + } + + public ByteBuffer fromString(String source) + { + throw new UnsupportedOperationException(); + } + + @Override + public Term fromJSONObject(Object parsed) + { + throw new UnsupportedOperationException(); + } + + @Override + public String toJSONString(ByteBuffer buffer, int protocolVersion) + { + throw new UnsupportedOperationException(); + } + + public int compare(ByteBuffer o1, ByteBuffer o2) + { + // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey + return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner)); + } + + @Override + public void validate(ByteBuffer bytes) throws MarshalException + { + throw new IllegalStateException("You shouldn't be validating this."); + } + + public TypeSerializer<ByteBuffer> getSerializer() + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java deleted file mode 100644 index efaea53..0000000 --- a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.marshal; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.cql3.Term; -import org.apache.cassandra.db.PartitionPosition; -import org.apache.cassandra.serializers.TypeSerializer; -import org.apache.cassandra.serializers.MarshalException; - -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** for sorting columns representing row keys in the row ordering as determined by a partitioner. - * Not intended for user-defined CFs, and will in fact error out if used with such. */ -public class PartitionerDefinedOrder extends AbstractType<ByteBuffer> -{ - private final IPartitioner partitioner; - - public PartitionerDefinedOrder(IPartitioner partitioner) - { - this.partitioner = partitioner; - } - - @Override - public ByteBuffer compose(ByteBuffer bytes) - { - throw new UnsupportedOperationException("You can't do this with a local partitioner."); - } - - @Override - public ByteBuffer decompose(ByteBuffer bytes) - { - throw new UnsupportedOperationException("You can't do this with a local partitioner."); - } - - public String getString(ByteBuffer bytes) - { - return ByteBufferUtil.bytesToHex(bytes); - } - - public ByteBuffer fromString(String source) - { - throw new UnsupportedOperationException(); - } - - @Override - public Term fromJSONObject(Object parsed) - { - throw new UnsupportedOperationException(); - } - - @Override - public String toJSONString(ByteBuffer buffer, int protocolVersion) - { - throw new UnsupportedOperationException(); - } - - public int compare(ByteBuffer o1, ByteBuffer o2) - { - // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey - return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner)); - } - - @Override - public void validate(ByteBuffer bytes) throws MarshalException - { - throw new IllegalStateException("You shouldn't be validating this."); - } - - public TypeSerializer<ByteBuffer> getSerializer() - { - throw new UnsupportedOperationException("You can't do this with a local partitioner."); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java index 25d1887..e8ec4c0 100644 --- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -28,7 +28,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.ColumnFilter; @@ -43,6 +42,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Locks; import org.apache.cassandra.utils.memory.MemtableAllocator; import org.apache.cassandra.utils.memory.HeapAllocator; +import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; import static org.apache.cassandra.utils.btree.BTree.Dir.desc; @@ -59,7 +59,7 @@ public class AtomicBTreePartition implements Partition private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class); public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"), - DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)), + StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)), null)); // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index f2e0617..102008f 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -102,17 +102,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true); } - public PartitionUpdate(CFMetaData metadata, - ByteBuffer key, - PartitionColumns columns, - int initialRowCapacity) - { - this(metadata, - metadata.decorateKey(key), - columns, - initialRowCapacity); - } - /** * Creates a empty immutable partition update. * @@ -145,7 +134,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition * Creates an immutable partition update that contains a single row update. * * @param metadata the metadata for the created update. - * @param key the partition key for the partition to update. + * @param key the partition key for the partition that the created update should delete. * @param row the row for the update. * * @return the newly created partition update containing only {@code row}. @@ -158,20 +147,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } /** - * Creates an immutable partition update that contains a single row update. - * - * @param metadata the metadata for the created update. - * @param key the partition key for the partition to update. - * @param row the row for the update. - * - * @return the newly created partition update containing only {@code row}. - */ - public static PartitionUpdate singleRowUpdate(CFMetaData metadata, ByteBuffer key, Row row) - { - return singleRowUpdate(metadata, metadata.decorateKey(key), row); - } - - /** * Turns the given iterator into an update. * * Warning: this method does not close the provided iterator, it is up to @@ -287,21 +262,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } /** - * Creates a partition update that entirely deletes a given partition. - * - * @param metadata the metadata for the created update. - * @param key the partition key for the partition that the created update should delete. - * @param timestamp the timestamp for the deletion. - * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion. - * - * @return the newly created partition deletion update. - */ - public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, ByteBuffer key, long timestamp, int nowInSec) - { - return fullPartitionDelete(metadata, metadata.decorateKey(key), timestamp, nowInSec); - } - - /** * Merges the provided updates, yielding a new update that incorporates all those updates. * * @param updates the collection of updates to merge. This shouldn't be empty. @@ -735,39 +695,29 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } } - public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException - { - if (version >= MessagingService.VERSION_30) - { - assert key == null; // key is only there for the old format - return deserialize30(in, version, flag); - } - else - { - assert key != null; - CFMetaData metadata = deserializeMetadata(in, version); - DecoratedKey dk = metadata.decorateKey(key); - return deserializePre30(in, version, flag, metadata, dk); - } - } - - // Used to share same decorated key between updates. public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException { - if (version >= MessagingService.VERSION_30) - { - return deserialize30(in, version, flag); - } - else + if (version < MessagingService.VERSION_30) { assert key != null; - CFMetaData metadata = deserializeMetadata(in, version); - return deserializePre30(in, version, flag, metadata, key); + + // This is only used in mutation, and mutation have never allowed "null" column families + boolean present = in.readBoolean(); + assert present; + + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version); + int size = in.readInt(); + Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size); + SerializationHelper helper = new SerializationHelper(metadata, version, flag); + try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper)) + { + return PartitionUpdate.fromIterator(iterator); + } } - } - private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException - { + assert key == null; // key is only there for the old format + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag); if (header.isEmpty) @@ -802,28 +752,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition false); } - private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException - { - // This is only used in mutation, and mutation have never allowed "null" column families - boolean present = in.readBoolean(); - assert present; - - CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); - return metadata; - } - - private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException - { - LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version); - int size = in.readInt(); - Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size); - SerializationHelper helper = new SerializationHelper(metadata, version, flag); - try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper)) - { - return PartitionUpdate.fromIterator(iterator); - } - } - public long serializedSize(PartitionUpdate update, int version) { if (version < MessagingService.VERSION_30) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 531bd26..b96e0b1 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -27,6 +27,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; /** @@ -169,7 +170,7 @@ public class UnfilteredRowIteratorSerializer public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException { - DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in)); + DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithVIntLength(in)); int flags = in.readUnsignedByte(); boolean isReversed = (flags & IS_REVERSED) != 0; if ((flags & IS_EMPTY) != 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/view/MaterializedView.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java index f36abae..082c71d 100644 --- a/src/java/org/apache/cassandra/db/view/MaterializedView.java +++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java @@ -303,10 +303,9 @@ public class MaterializedView partitionKey[i] = value; } - CFMetaData metadata = getViewCfs().metadata; - return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata - .getKeyValidatorAsClusteringComparator() - .make(partitionKey))); + return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata + .getKeyValidatorAsClusteringComparator() + .make(partitionKey))); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java index d0ba5ea..53e4e91 100644 --- a/src/java/org/apache/cassandra/db/view/TemporalRow.java +++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java @@ -377,7 +377,7 @@ public class TemporalRow this.baseCfs = baseCfs; this.viewPrimaryKey = viewPrimaryKey; this.key = key; - this.dk = baseCfs.decorateKey(key); + this.dk = baseCfs.partitioner.decorateKey(key); this.clusteringToRow = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 31fda34..2cb7f61 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -164,7 +164,7 @@ public class BootStrapper extends ProgressEventNotifierSupport // if user specified tokens, use those if (initialTokens.size() > 0) return getSpecifiedTokens(metadata, initialTokens); - + int numTokens = DatabaseDescriptor.getNumTokens(); if (numTokens < 1) throw new ConfigurationException("num_tokens must be >= 1"); @@ -179,13 +179,13 @@ public class BootStrapper extends ProgressEventNotifierSupport } private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata, - Collection<String> initialTokens) + Collection<String> initialTokens) { logger.debug("tokens manually specified as {}", initialTokens); List<Token> tokens = new ArrayList<>(initialTokens.size()); for (String tokenString : initialTokens) { - Token token = metadata.partitioner.getTokenFactory().fromString(tokenString); + Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString); if (metadata.getEndpoint(token) != null) throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first)."); tokens.add(token); @@ -202,8 +202,8 @@ public class BootStrapper extends ProgressEventNotifierSupport if (ks == null) throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace); AbstractReplicationStrategy rs = ks.getReplicationStrategy(); - - return TokenAllocation.allocateTokens(metadata, rs, address, numTokens); + + return TokenAllocation.allocateTokens(metadata, rs, StorageService.getPartitioner(), address, numTokens); } public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens) @@ -211,7 +211,7 @@ public class BootStrapper extends ProgressEventNotifierSupport Set<Token> tokens = new HashSet<>(numTokens); while (tokens.size() < numTokens) { - Token token = metadata.partitioner.getRandomToken(); + Token token = StorageService.getPartitioner().getRandomToken(); if (metadata.getEndpoint(token) == null) tokens.add(token); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java index 46872c1..d7139d0 100644 --- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java +++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java @@ -300,9 +300,4 @@ public class ByteOrderedPartitioner implements IPartitioner { return BytesType.instance; } - - public AbstractType<?> partitionOrdering() - { - return BytesType.instance; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/IPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java index e0a08dc..b22da66 100644 --- a/src/java/org/apache/cassandra/dht/IPartitioner.java +++ b/src/java/org/apache/cassandra/dht/IPartitioner.java @@ -78,10 +78,4 @@ public interface IPartitioner public Map<Token, Float> describeOwnership(List<Token> sortedTokens); public AbstractType<?> getTokenValidator(); - - /** - * Abstract type that orders the same way as DecoratedKeys provided by this partitioner. - * Used by secondary indices. - */ - public AbstractType<?> partitionOrdering(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/LocalPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java index 2a5a16e..01dc75e 100644 --- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java @@ -84,11 +84,6 @@ public class LocalPartitioner implements IPartitioner return comparator; } - public AbstractType<?> partitionOrdering() - { - return comparator; - } - public class LocalToken extends ComparableObjectToken<ByteBuffer> { static final long serialVersionUID = 8437543776403014875L; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java index d68be3f..003879c 100644 --- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -26,7 +26,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PreHashedDecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.PartitionerDefinedOrder; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.ByteBufferUtil; @@ -46,7 +45,6 @@ public class Murmur3Partitioner implements IPartitioner private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(MINIMUM); public static final Murmur3Partitioner instance = new Murmur3Partitioner(); - public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance); public DecoratedKey decorateKey(ByteBuffer key) { @@ -290,9 +288,4 @@ public class Murmur3Partitioner implements IPartitioner { return LongType.instance; } - - public AbstractType<?> partitionOrdering() - { - return partitionOrdering; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java index 45c2cfa..ae0326f 100644 --- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java +++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java @@ -240,9 +240,4 @@ public class OrderPreservingPartitioner implements IPartitioner { return UTF8Type.instance; } - - public AbstractType<?> partitionOrdering() - { - return UTF8Type.instance; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index b0dea01..71a0a99 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -29,7 +29,6 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.IntegerType; -import org.apache.cassandra.db.marshal.PartitionerDefinedOrder; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.GuidGenerator; @@ -48,7 +47,6 @@ public class RandomPartitioner implements IPartitioner private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(FBUtilities.hashToBigInteger(ByteBuffer.allocate(1)))); public static final RandomPartitioner instance = new RandomPartitioner(); - public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance); public DecoratedKey decorateKey(ByteBuffer key) { @@ -198,9 +196,4 @@ public class RandomPartitioner implements IPartitioner { return IntegerType.instance; } - - public AbstractType<?> partitionOrdering() - { - return partitionOrdering; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index e7624c3..68c8a11 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -333,7 +333,7 @@ public class RangeStreamer Collection<Range<Token>> ranges = entry.getValue().getValue(); // filter out already streamed ranges - Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner); + Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner()); if (ranges.removeAll(availableRanges)) { logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java index b4281ce..dd3a02b 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java @@ -33,6 +33,7 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractReplicationStrategy; @@ -48,11 +49,12 @@ public class TokenAllocation public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, + final IPartitioner partitioner, final InetAddress endpoint, int numTokens) { StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint); - Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens); + Collection<Token> tokens = create(tokenMetadata, strategy, partitioner).addUnit(endpoint, numTokens); tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens); if (logger.isWarnEnabled()) @@ -139,7 +141,7 @@ public class TokenAllocation return stat; } - static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy) + static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy, IPartitioner partitioner) { NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>(); for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet()) @@ -147,7 +149,7 @@ public class TokenAllocation if (strategy.inAllocationRing(en.getValue())) sortedTokens.put(en.getKey(), en.getValue()); } - return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, tokenMetadata.partitioner); + return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner); } interface StrategyAdapter extends ReplicationStrategy<InetAddress> http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 5fa402a..e61a35a 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -584,7 +584,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean JVMStabilityInspector.inspectThrowable(th); // TODO this is broken logger.warn("Unable to calculate tokens for {}. Will use a random one", address); - tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken()); + tokens = Collections.singletonList(StorageService.getPartitioner().getRandomToken()); } int generation = epState.getHeartBeatState().getGeneration(); int heartbeat = epState.getHeartBeatState().getHeartBeatVersion(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index f4b4da8..acc9141 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.io.Closeable; -import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -31,6 +30,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.Pair; @@ -46,11 +46,12 @@ abstract class AbstractSSTableSimpleWriter implements Closeable protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat(); protected static AtomicInteger generation = new AtomicInteger(0); - protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns) + protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns) { this.metadata = metadata; this.directory = directory; this.columns = columns; + DatabaseDescriptor.setPartitioner(partitioner); } protected void setSSTableFormatType(SSTableFormat.Type type) @@ -102,11 +103,6 @@ abstract class AbstractSSTableSimpleWriter implements Closeable return maxGen; } - PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException - { - return getUpdateFor(metadata.decorateKey(key)); - } - /** * Returns a PartitionUpdate suitable to write on this writer for the provided key. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 7ae5651..43e214b 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -77,9 +77,6 @@ public class CQLSSTableWriter implements Closeable static { Config.setClientMode(true); - // Partitioner is not set in client mode. - if (DatabaseDescriptor.getPartitioner() == null) - DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); } private final AbstractSSTableSimpleWriter writer; @@ -222,7 +219,10 @@ public class CQLSSTableWriter implements Closeable try { for (ByteBuffer key : keys) - insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params); + { + DecoratedKey dk = DatabaseDescriptor.getPartitioner().decorateKey(key); + insert.addUpdateForKey(writer.getUpdateFor(dk), clustering, params); + } return this; } catch (SSTableSimpleUnsortedWriter.SyncException e) @@ -277,6 +277,7 @@ public class CQLSSTableWriter implements Closeable public static class Builder { private File directory; + private IPartitioner partitioner = Murmur3Partitioner.instance; protected SSTableFormat.Type formatType = null; @@ -401,7 +402,7 @@ public class CQLSSTableWriter implements Closeable */ public Builder withPartitioner(IPartitioner partitioner) { - this.schema = schema.copy(partitioner); + this.partitioner = partitioner; return this; } @@ -510,8 +511,8 @@ public class CQLSSTableWriter implements Closeable throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); AbstractSSTableSimpleWriter writer = sorted - ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns()) - : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB); + ? new SSTableSimpleWriter(directory, schema, partitioner, insert.updatedColumns()) + : new SSTableSimpleUnsortedWriter(directory, schema, partitioner, insert.updatedColumns(), bufferSizeInMB); if (formatType != null) writer.setSSTableFormatType(formatType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/KeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java index 8fb300b..5de2452 100644 --- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java @@ -23,10 +23,8 @@ import java.io.IOException; import com.google.common.collect.AbstractIterator; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -82,13 +80,11 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close } private final In in; - private final IPartitioner partitioner; - public KeyIterator(Descriptor desc, CFMetaData metadata) + public KeyIterator(Descriptor desc) { in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX))); - partitioner = metadata.partitioner; } protected DecoratedKey computeNext() @@ -98,7 +94,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close if (in.isEOF()) return endOfData(); - DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get())); + DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in.get())); RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the entry return key; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java index bbc56cc..f1e01f2 100644 --- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java @@ -40,7 +40,7 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey> { iters = new ArrayList<>(sstables.size()); for (SSTableReader sstable : sstables) - iters.add(new KeyIterator(sstable.descriptor, sstable.metadata)); + iters.add(new KeyIterator(sstable.descriptor)); } private void maybeInit() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index b86d9b4..516534d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -18,7 +18,6 @@ package org.apache.cassandra.io.sstable; import java.io.*; -import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; @@ -64,29 +63,31 @@ public abstract class SSTable public final Descriptor descriptor; protected final Set<Component> components; public final CFMetaData metadata; + public final IPartitioner partitioner; public final boolean compression; public DecoratedKey first; public DecoratedKey last; - protected SSTable(Descriptor descriptor, CFMetaData metadata) + protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner) { - this(descriptor, new HashSet<>(), metadata); + this(descriptor, new HashSet<>(), metadata, partitioner); } - protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata) + protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) { // In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without // full schema definition. SSTableLoader use that ability assert descriptor != null; assert components != null; - assert metadata != null; + assert partitioner != null; this.descriptor = descriptor; Set<Component> dataComponents = new HashSet<>(components); this.compression = dataComponents.contains(Component.COMPRESSION_INFO); this.components = new CopyOnWriteArraySet<>(dataComponents); this.metadata = metadata; + this.partitioner = partitioner; } /** @@ -120,16 +121,6 @@ public abstract class SSTable return true; } - public IPartitioner getPartitioner() - { - return metadata.partitioner; - } - - public DecoratedKey decorateKey(ByteBuffer key) - { - return getPartitioner().decorateKey(key); - } - /** * If the given @param key occupies only part of a larger buffer, allocate a new buffer that is only * as large as necessary. http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 20c3962..f25d3ff 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -133,7 +133,7 @@ public class SSTableLoader implements StreamEventHandler // To conserve memory, open SSTableReaders without bloom filters and discard // the index summary after calculating the file sections to stream and the estimated // number of keys for each endpoint. See CASSANDRA-5555 for details. - SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata); + SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata, client.getPartitioner()); sstables.add(sstable); // calculate the sstable sections to stream as well as the estimated number of @@ -252,6 +252,7 @@ public class SSTableLoader implements StreamEventHandler public static abstract class Client { private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>(); + private IPartitioner partitioner; /** * Initialize the client. @@ -298,6 +299,23 @@ public class SSTableLoader implements StreamEventHandler return endpointToRanges; } + protected void setPartitioner(String partclass) throws ConfigurationException + { + setPartitioner(FBUtilities.newPartitioner(partclass)); + } + + protected void setPartitioner(IPartitioner partitioner) + { + this.partitioner = partitioner; + // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner + DatabaseDescriptor.setPartitioner(partitioner); + } + + public IPartitioner getPartitioner() + { + return partitioner; + } + protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint) { Collection<Range<Token>> ranges = endpointToRanges.get(endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index f4b9adf..a70b92f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredSerializer; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.utils.JVMStabilityInspector; /** @@ -59,9 +60,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>(); private final DiskWriter diskWriter = new DiskWriter(); - SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB) + SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB) { - super(directory, metadata, columns); + super(directory, metadata, partitioner, columns); this.bufferSize = bufferSizeInMB * 1024L * 1024L; this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS); diskWriter.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java index 45722cd..b22a048 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java @@ -44,9 +44,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter private SSTableTxnWriter writer; - protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns) + protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns) { - super(directory, metadata, columns); + super(directory, metadata, partitioner, columns); } private SSTableTxnWriter getOrCreateWriter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 5ceced5..6d39d2d 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -27,11 +27,10 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.RateLimiter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; @@ -43,23 +42,27 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.lifecycle.TransactionLogs; -import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.format.big.BigTableWriter; import org.apache.cassandra.io.sstable.metadata.*; import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.SelfRefCounted; @@ -348,18 +351,21 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException { - return open(desc, componentsFor(desc), metadata); + IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR) + ? new LocalPartitioner(metadata.getKeyValidator()) + : StorageService.getPartitioner(); + return open(desc, componentsFor(desc), metadata, p); } - public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException + public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException { - return open(descriptor, components, metadata, true, true); + return open(descriptor, components, metadata, partitioner, true, true); } // use only for offline or "Standalone" operations public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs) throws IOException { - return open(descriptor, components, cfs.metadata, false, false); // do not track hotness + return open(descriptor, components, cfs.metadata, cfs.partitioner, false, false); // do not track hotness } /** @@ -368,10 +374,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * @param descriptor * @param components * @param metadata + * @param partitioner * @return opened SSTableReader * @throws IOException */ - public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException + public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException { // Minimum components without which we can't do anything assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor; @@ -387,7 +394,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. // In that case, we skip the check. - String partitionerName = metadata.partitioner.getClass().getCanonicalName(); + String partitionerName = partitioner.getClass().getCanonicalName(); if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) { logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", @@ -399,6 +406,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader sstable = internalOpen(descriptor, components, metadata, + partitioner, System.currentTimeMillis(), statsMetadata, OpenReason.NORMAL, @@ -423,6 +431,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, + IPartitioner partitioner, boolean validate, boolean trackHotness) throws IOException { @@ -443,7 +452,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. // In that case, we skip the check. - String partitionerName = metadata.partitioner.getClass().getCanonicalName(); + String partitionerName = partitioner.getClass().getCanonicalName(); if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) { logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", @@ -455,6 +464,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader sstable = internalOpen(descriptor, components, metadata, + partitioner, System.currentTimeMillis(), statsMetadata, OpenReason.NORMAL, @@ -492,7 +502,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries, - final CFMetaData metadata) + final CFMetaData metadata, + final IPartitioner partitioner) { final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>(); @@ -506,7 +517,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader sstable; try { - sstable = open(entry.getKey(), entry.getValue(), metadata); + sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner); } catch (CorruptSSTableException ex) { @@ -551,6 +562,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static SSTableReader internalOpen(Descriptor desc, Set<Component> components, CFMetaData metadata, + IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, @@ -560,9 +572,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS OpenReason openReason, SerializationHeader header) { - assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null; + assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null; - SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header); + SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); reader.bf = bf; reader.ifile = ifile; @@ -577,6 +589,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS private static SSTableReader internalOpen(final Descriptor descriptor, Set<Component> components, CFMetaData metadata, + IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, @@ -584,18 +597,19 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { Factory readerFactory = descriptor.getFormat().getReaderFactory(); - return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header); + return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); } protected SSTableReader(final Descriptor desc, Set<Component> components, CFMetaData metadata, + IPartitioner partitioner, long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header) { - super(desc, components, metadata); + super(desc, components, metadata, partitioner); this.sstableMetadata = sstableMetadata; this.header = header; this.maxDataAge = maxDataAge; @@ -800,7 +814,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex); - DecoratedKey decoratedKey = decorateKey(key); + DecoratedKey decoratedKey = partitioner.decorateKey(key); if (first == null) first = decoratedKey; last = decoratedKey; @@ -818,7 +832,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } if (!summaryLoaded) - indexSummary = summaryBuilder.build(getPartitioner()); + indexSummary = summaryBuilder.build(partitioner); } } @@ -848,10 +862,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { iStream = new DataInputStream(new FileInputStream(summariesFile)); indexSummary = IndexSummary.serializer.deserialize( - iStream, getPartitioner(), descriptor.version.hasSamplingLevel(), + iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); - first = decorateKey(ByteBufferUtil.readWithLength(iStream)); - last = decorateKey(ByteBufferUtil.readWithLength(iStream)); + first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); ibuilder.deserializeBounds(iStream); dbuilder.deserializeBounds(iStream); } @@ -1050,6 +1064,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader replacement = internalOpen(descriptor, components, metadata, + partitioner, ifile != null ? ifile.sharedCopy() : null, dfile.sharedCopy(), newSummary, @@ -1153,7 +1168,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS else if (samplingLevel < indexSummary.getSamplingLevel()) { // we can use the existing index summary to make a smaller one - newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner()); + newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression)) @@ -1188,11 +1203,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS long indexPosition; while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) { - summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition); + summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition); RowIndexEntry.Serializer.skip(primaryIndex); } - return summaryBuilder.build(getPartitioner()); + return summaryBuilder.build(partitioner); } } finally @@ -1289,8 +1304,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata(); - // We need the parent cf metadata - String cfName = metadata.isIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName; + //We need the parent cf metadata + String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName; cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName)); return cmd; @@ -1462,7 +1477,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public DecoratedKey next() { byte[] bytes = indexSummary.getKey(idx++); - return decorateKey(ByteBuffer.wrap(bytes)); + return partitioner.decorateKey(ByteBuffer.wrap(bytes)); } public void remove() @@ -1604,7 +1619,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS while (!in.isEOF()) { ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); - DecoratedKey indexDecoratedKey = decorateKey(indexKey); + DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); if (indexDecoratedKey.compareTo(token) > 0) return indexDecoratedKey; @@ -2285,6 +2300,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public abstract SSTableReader open(final Descriptor descriptor, Set<Component> components, CFMetaData metadata, + IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index fa691b8..08a9dcc 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -27,11 +27,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; @@ -72,10 +74,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, CFMetaData metadata, + IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header) { - super(descriptor, components(metadata), metadata); + super(descriptor, components(metadata), metadata, partitioner); this.keyCount = keyCount; this.repairedAt = repairedAt; this.metadataCollector = metadataCollector; @@ -87,18 +90,19 @@ public abstract class SSTableWriter extends SSTable implements Transactional Long keyCount, Long repairedAt, CFMetaData metadata, + IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); + return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn); } public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) { CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn); + return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header, txn); } public static SSTableWriter create(CFMetaData metadata, @@ -106,11 +110,12 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, int sstableLevel, + IPartitioner partitioner, SerializationHeader header, LifecycleTransaction txn) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn); + return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header, txn); } public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn) @@ -250,7 +255,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected Map<MetadataType, MetadataComponent> finalizeMetadata() { - return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(), + return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), metadata.getBloomFilterFpChance(), repairedAt, header); @@ -282,6 +287,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, CFMetaData metadata, + IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index 860cd9f..a072d4d 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -23,6 +23,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableFormat; @@ -85,20 +86,21 @@ public class BigFormat implements SSTableFormat long keyCount, long repairedAt, CFMetaData metadata, + IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); + return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn); } } static class ReaderFactory extends SSTableReader.Factory { @Override - public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header) + public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header) { - return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header); + return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a22ce89e/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index 87608fd..b539c79 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -52,9 +52,9 @@ public class BigTableReader extends SSTableReader { private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class); - BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header) + BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header) { - super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header); + super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); } public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift) @@ -201,7 +201,7 @@ public class BigTableReader extends SSTableReader } else { - DecoratedKey indexDecoratedKey = decorateKey(indexKey); + DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); int comparison = indexDecoratedKey.compareTo(key); int v = op.apply(comparison); opSatisfied = (v == 0); @@ -227,7 +227,7 @@ public class BigTableReader extends SSTableReader // expensive sanity check! see CASSANDRA-4687 try (FileDataInput fdi = dfile.getSegment(indexEntry.position)) { - DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi)); + DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi)); if (!keyInDisk.equals(key)) throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); }
