http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/index/SecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java index 94031ab..9221090 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java @@ -42,15 +42,12 @@ import org.apache.cassandra.db.index.keys.KeysIndex; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.LocalByPartionerType; +import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; - import org.apache.cassandra.utils.concurrent.Refs; /** @@ -79,10 +76,6 @@ public abstract class SecondaryIndex */ public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values"; - public static final AbstractType<?> keyComparator = StorageService.getPartitioner().preservesOrder() - ? BytesType.instance - : new LocalByPartionerType(StorageService.getPartitioner()); - /** * Base CF that has many indexes */ @@ -303,7 +296,7 @@ public abstract class SecondaryIndex */ public DecoratedKey getIndexKeyFor(ByteBuffer value) { - return getIndexCfs().partitioner.decorateKey(value); + return getIndexCfs().decorateKey(value); } /** @@ -381,11 +374,20 @@ public abstract class SecondaryIndex */ public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def) { + return newIndexMetadata(baseMetadata, def, def.type); + } + + /** + * Create the index metadata for the index on a given column of a given table. + */ + static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def, AbstractType<?> comparator) + { if (def.getIndexType() == IndexType.CUSTOM) return null; CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def)) .withId(baseMetadata.cfId) + .withPartitioner(new LocalPartitioner(comparator)) .addPartitionKey(def.name, def.type); if (def.getIndexType() == IndexType.COMPOSITES)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java index 42861c5..29f235c 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java @@ -101,7 +101,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef) { - indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); + indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering()); for (ColumnDefinition def : baseMetadata.clusteringColumns()) indexMetadata.addClusteringColumn(def.name, def.type); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java index 6529ad9..cd4aff9 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java @@ -24,7 +24,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.utils.concurrent.OpOrder; /** @@ -48,7 +47,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex { public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef) { - indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); + indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering()); List<ColumnDefinition> cks = baseMetadata.clusteringColumns(); for (int i = 0; i < columnDef.position(); i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index d322faf..b76bf7e 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -111,7 +111,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher // *data* for a given partition. BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator()); List<CompositesIndex.IndexedEntry> entries = new ArrayList<>(); - DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey); + DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey); while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java index 7930bd6..478559a 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java @@ -23,7 +23,6 @@ import java.util.Set; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexSearcher; @@ -42,7 +41,7 @@ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex { public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef) { - indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); + indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index bcaf70b..53a9b4a 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -84,7 +84,7 @@ public class KeysSearcher extends SecondaryIndexSearcher while (next == null && indexHits.hasNext()) { Row hit = indexHits.next(); - DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0)); + DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0)); SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(), baseCfs.metadata, http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java deleted file mode 100644 index e02ba3c..0000000 --- a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.marshal; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.cql3.Term; -import org.apache.cassandra.db.PartitionPosition; -import org.apache.cassandra.serializers.TypeSerializer; -import org.apache.cassandra.serializers.MarshalException; - -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; - -/** for sorting columns representing row keys in the row ordering as determined by a partitioner. - * Not intended for user-defined CFs, and will in fact error out if used with such. */ -public class LocalByPartionerType extends AbstractType<ByteBuffer> -{ - private final IPartitioner partitioner; - - public LocalByPartionerType(IPartitioner partitioner) - { - this.partitioner = partitioner; - } - - public static LocalByPartionerType getInstance(TypeParser parser) - { - return new LocalByPartionerType(StorageService.getPartitioner()); - } - - @Override - public ByteBuffer compose(ByteBuffer bytes) - { - throw new UnsupportedOperationException("You can't do this with a local partitioner."); - } - - @Override - public ByteBuffer decompose(ByteBuffer bytes) - { - throw new UnsupportedOperationException("You can't do this with a local partitioner."); - } - - public String getString(ByteBuffer bytes) - { - return ByteBufferUtil.bytesToHex(bytes); - } - - public ByteBuffer fromString(String source) - { - throw new UnsupportedOperationException(); - } - - @Override - public Term fromJSONObject(Object parsed) - { - throw new UnsupportedOperationException(); - } - - @Override - public String toJSONString(ByteBuffer buffer, int protocolVersion) - { - throw new UnsupportedOperationException(); - } - - public int compare(ByteBuffer o1, ByteBuffer o2) - { - // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey - return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner)); - } - - @Override - public void validate(ByteBuffer bytes) throws MarshalException - { - throw new IllegalStateException("You shouldn't be validating this."); - } - - public TypeSerializer<ByteBuffer> getSerializer() - { - throw new UnsupportedOperationException("You can't do this with a local partitioner."); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java new file mode 100644 index 0000000..efaea53 --- /dev/null +++ b/src/java/org/apache/cassandra/db/marshal/PartitionerDefinedOrder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.marshal; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.cql3.Term; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.serializers.MarshalException; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** for sorting columns representing row keys in the row ordering as determined by a partitioner. + * Not intended for user-defined CFs, and will in fact error out if used with such. */ +public class PartitionerDefinedOrder extends AbstractType<ByteBuffer> +{ + private final IPartitioner partitioner; + + public PartitionerDefinedOrder(IPartitioner partitioner) + { + this.partitioner = partitioner; + } + + @Override + public ByteBuffer compose(ByteBuffer bytes) + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } + + @Override + public ByteBuffer decompose(ByteBuffer bytes) + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } + + public String getString(ByteBuffer bytes) + { + return ByteBufferUtil.bytesToHex(bytes); + } + + public ByteBuffer fromString(String source) + { + throw new UnsupportedOperationException(); + } + + @Override + public Term fromJSONObject(Object parsed) + { + throw new UnsupportedOperationException(); + } + + @Override + public String toJSONString(ByteBuffer buffer, int protocolVersion) + { + throw new UnsupportedOperationException(); + } + + public int compare(ByteBuffer o1, ByteBuffer o2) + { + // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey + return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner)); + } + + @Override + public void validate(ByteBuffer bytes) throws MarshalException + { + throw new IllegalStateException("You shouldn't be validating this."); + } + + public TypeSerializer<ByteBuffer> getSerializer() + { + throw new UnsupportedOperationException("You can't do this with a local partitioner."); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java index e8ec4c0..25d1887 100644 --- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.ColumnFilter; @@ -42,7 +43,6 @@ import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Locks; import org.apache.cassandra.utils.memory.MemtableAllocator; import org.apache.cassandra.utils.memory.HeapAllocator; -import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; import static org.apache.cassandra.utils.btree.BTree.Dir.desc; @@ -59,7 +59,7 @@ public class AtomicBTreePartition implements Partition private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class); public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"), - StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)), + DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)), null)); // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues) http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 102008f..f2e0617 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -102,6 +102,17 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true); } + public PartitionUpdate(CFMetaData metadata, + ByteBuffer key, + PartitionColumns columns, + int initialRowCapacity) + { + this(metadata, + metadata.decorateKey(key), + columns, + initialRowCapacity); + } + /** * Creates a empty immutable partition update. * @@ -134,7 +145,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition * Creates an immutable partition update that contains a single row update. * * @param metadata the metadata for the created update. - * @param key the partition key for the partition that the created update should delete. + * @param key the partition key for the partition to update. * @param row the row for the update. * * @return the newly created partition update containing only {@code row}. @@ -147,6 +158,20 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } /** + * Creates an immutable partition update that contains a single row update. + * + * @param metadata the metadata for the created update. + * @param key the partition key for the partition to update. + * @param row the row for the update. + * + * @return the newly created partition update containing only {@code row}. + */ + public static PartitionUpdate singleRowUpdate(CFMetaData metadata, ByteBuffer key, Row row) + { + return singleRowUpdate(metadata, metadata.decorateKey(key), row); + } + + /** * Turns the given iterator into an update. * * Warning: this method does not close the provided iterator, it is up to @@ -262,6 +287,21 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } /** + * Creates a partition update that entirely deletes a given partition. + * + * @param metadata the metadata for the created update. + * @param key the partition key for the partition that the created update should delete. + * @param timestamp the timestamp for the deletion. + * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion. + * + * @return the newly created partition deletion update. + */ + public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, ByteBuffer key, long timestamp, int nowInSec) + { + return fullPartitionDelete(metadata, metadata.decorateKey(key), timestamp, nowInSec); + } + + /** * Merges the provided updates, yielding a new update that incorporates all those updates. * * @param updates the collection of updates to merge. This shouldn't be empty. @@ -695,29 +735,39 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } } - public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException + public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException { - if (version < MessagingService.VERSION_30) + if (version >= MessagingService.VERSION_30) + { + assert key == null; // key is only there for the old format + return deserialize30(in, version, flag); + } + else { assert key != null; - - // This is only used in mutation, and mutation have never allowed "null" column families - boolean present = in.readBoolean(); - assert present; - - CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); - LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version); - int size = in.readInt(); - Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size); - SerializationHelper helper = new SerializationHelper(metadata, version, flag); - try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper)) - { - return PartitionUpdate.fromIterator(iterator); - } + CFMetaData metadata = deserializeMetadata(in, version); + DecoratedKey dk = metadata.decorateKey(key); + return deserializePre30(in, version, flag, metadata, dk); } + } - assert key == null; // key is only there for the old format + // Used to share same decorated key between updates. + public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException + { + if (version >= MessagingService.VERSION_30) + { + return deserialize30(in, version, flag); + } + else + { + assert key != null; + CFMetaData metadata = deserializeMetadata(in, version); + return deserializePre30(in, version, flag, metadata, key); + } + } + private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException + { CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag); if (header.isEmpty) @@ -752,6 +802,28 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition false); } + private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException + { + // This is only used in mutation, and mutation have never allowed "null" column families + boolean present = in.readBoolean(); + assert present; + + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + return metadata; + } + + private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException + { + LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version); + int size = in.readInt(); + Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size); + SerializationHelper helper = new SerializationHelper(metadata, version, flag); + try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper)) + { + return PartitionUpdate.fromIterator(iterator); + } + } + public long serializedSize(PartitionUpdate update, int version) { if (version < MessagingService.VERSION_30) http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index b96e0b1..531bd26 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -27,7 +27,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; /** @@ -170,7 +169,7 @@ public class UnfilteredRowIteratorSerializer public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException { - DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithVIntLength(in)); + DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in)); int flags = in.readUnsignedByte(); boolean isReversed = (flags & IS_REVERSED) != 0; if ((flags & IS_EMPTY) != 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/view/MaterializedView.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java index 082c71d..f36abae 100644 --- a/src/java/org/apache/cassandra/db/view/MaterializedView.java +++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java @@ -303,9 +303,10 @@ public class MaterializedView partitionKey[i] = value; } - return getViewCfs().partitioner.decorateKey(CFMetaData.serializePartitionKey(getViewCfs().metadata - .getKeyValidatorAsClusteringComparator() - .make(partitionKey))); + CFMetaData metadata = getViewCfs().metadata; + return metadata.decorateKey(CFMetaData.serializePartitionKey(metadata + .getKeyValidatorAsClusteringComparator() + .make(partitionKey))); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java index 53e4e91..d0ba5ea 100644 --- a/src/java/org/apache/cassandra/db/view/TemporalRow.java +++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java @@ -377,7 +377,7 @@ public class TemporalRow this.baseCfs = baseCfs; this.viewPrimaryKey = viewPrimaryKey; this.key = key; - this.dk = baseCfs.partitioner.decorateKey(key); + this.dk = baseCfs.decorateKey(key); this.clusteringToRow = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 2cb7f61..31fda34 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -164,7 +164,7 @@ public class BootStrapper extends ProgressEventNotifierSupport // if user specified tokens, use those if (initialTokens.size() > 0) return getSpecifiedTokens(metadata, initialTokens); - + int numTokens = DatabaseDescriptor.getNumTokens(); if (numTokens < 1) throw new ConfigurationException("num_tokens must be >= 1"); @@ -179,13 +179,13 @@ public class BootStrapper extends ProgressEventNotifierSupport } private static Collection<Token> getSpecifiedTokens(final TokenMetadata metadata, - Collection<String> initialTokens) + Collection<String> initialTokens) { logger.debug("tokens manually specified as {}", initialTokens); List<Token> tokens = new ArrayList<>(initialTokens.size()); for (String tokenString : initialTokens) { - Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString); + Token token = metadata.partitioner.getTokenFactory().fromString(tokenString); if (metadata.getEndpoint(token) != null) throw new ConfigurationException("Bootstrapping to existing token " + tokenString + " is not allowed (decommission/removenode the old node first)."); tokens.add(token); @@ -202,8 +202,8 @@ public class BootStrapper extends ProgressEventNotifierSupport if (ks == null) throw new ConfigurationException("Problem opening token allocation keyspace " + allocationKeyspace); AbstractReplicationStrategy rs = ks.getReplicationStrategy(); - - return TokenAllocation.allocateTokens(metadata, rs, StorageService.getPartitioner(), address, numTokens); + + return TokenAllocation.allocateTokens(metadata, rs, address, numTokens); } public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens) @@ -211,7 +211,7 @@ public class BootStrapper extends ProgressEventNotifierSupport Set<Token> tokens = new HashSet<>(numTokens); while (tokens.size() < numTokens) { - Token token = StorageService.getPartitioner().getRandomToken(); + Token token = metadata.partitioner.getRandomToken(); if (metadata.getEndpoint(token) == null) tokens.add(token); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java index d7139d0..46872c1 100644 --- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java +++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java @@ -300,4 +300,9 @@ public class ByteOrderedPartitioner implements IPartitioner { return BytesType.instance; } + + public AbstractType<?> partitionOrdering() + { + return BytesType.instance; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/IPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java index b22da66..e0a08dc 100644 --- a/src/java/org/apache/cassandra/dht/IPartitioner.java +++ b/src/java/org/apache/cassandra/dht/IPartitioner.java @@ -78,4 +78,10 @@ public interface IPartitioner public Map<Token, Float> describeOwnership(List<Token> sortedTokens); public AbstractType<?> getTokenValidator(); + + /** + * Abstract type that orders the same way as DecoratedKeys provided by this partitioner. + * Used by secondary indices. + */ + public AbstractType<?> partitionOrdering(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/LocalPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java index 01dc75e..2a5a16e 100644 --- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java @@ -84,6 +84,11 @@ public class LocalPartitioner implements IPartitioner return comparator; } + public AbstractType<?> partitionOrdering() + { + return comparator; + } + public class LocalToken extends ComparableObjectToken<ByteBuffer> { static final long serialVersionUID = 8437543776403014875L; http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java index 003879c..d68be3f 100644 --- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -26,6 +26,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PreHashedDecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.PartitionerDefinedOrder; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.ByteBufferUtil; @@ -45,6 +46,7 @@ public class Murmur3Partitioner implements IPartitioner private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(MINIMUM); public static final Murmur3Partitioner instance = new Murmur3Partitioner(); + public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance); public DecoratedKey decorateKey(ByteBuffer key) { @@ -288,4 +290,9 @@ public class Murmur3Partitioner implements IPartitioner { return LongType.instance; } + + public AbstractType<?> partitionOrdering() + { + return partitionOrdering; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java index ae0326f..45c2cfa 100644 --- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java +++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java @@ -240,4 +240,9 @@ public class OrderPreservingPartitioner implements IPartitioner { return UTF8Type.instance; } + + public AbstractType<?> partitionOrdering() + { + return UTF8Type.instance; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/RandomPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index 71a0a99..b0dea01 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -29,6 +29,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.PartitionerDefinedOrder; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.GuidGenerator; @@ -47,6 +48,7 @@ public class RandomPartitioner implements IPartitioner private static final int HEAP_SIZE = (int) ObjectSizes.measureDeep(new BigIntegerToken(FBUtilities.hashToBigInteger(ByteBuffer.allocate(1)))); public static final RandomPartitioner instance = new RandomPartitioner(); + public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance); public DecoratedKey decorateKey(ByteBuffer key) { @@ -196,4 +198,9 @@ public class RandomPartitioner implements IPartitioner { return IntegerType.instance; } + + public AbstractType<?> partitionOrdering() + { + return partitionOrdering; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 68c8a11..e7624c3 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -333,7 +333,7 @@ public class RangeStreamer Collection<Range<Token>> ranges = entry.getValue().getValue(); // filter out already streamed ranges - Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner()); + Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner); if (ranges.removeAll(availableRanges)) { logger.info("Some ranges of {} are already available. Skipping streaming those ranges.", availableRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java index dd3a02b..b4281ce 100644 --- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java +++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java @@ -33,7 +33,6 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.AbstractReplicationStrategy; @@ -49,12 +48,11 @@ public class TokenAllocation public static Collection<Token> allocateTokens(final TokenMetadata tokenMetadata, final AbstractReplicationStrategy rs, - final IPartitioner partitioner, final InetAddress endpoint, int numTokens) { StrategyAdapter strategy = getStrategy(tokenMetadata, rs, endpoint); - Collection<Token> tokens = create(tokenMetadata, strategy, partitioner).addUnit(endpoint, numTokens); + Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens); tokens = adjustForCrossDatacenterClashes(tokenMetadata, strategy, tokens); if (logger.isWarnEnabled()) @@ -141,7 +139,7 @@ public class TokenAllocation return stat; } - static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy, IPartitioner partitioner) + static TokenAllocator<InetAddress> create(TokenMetadata tokenMetadata, StrategyAdapter strategy) { NavigableMap<Token, InetAddress> sortedTokens = new TreeMap<>(); for (Map.Entry<Token, InetAddress> en : tokenMetadata.getNormalAndBootstrappingTokenToEndpointMap().entrySet()) @@ -149,7 +147,7 @@ public class TokenAllocation if (strategy.inAllocationRing(en.getValue())) sortedTokens.put(en.getKey(), en.getValue()); } - return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner); + return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, tokenMetadata.partitioner); } interface StrategyAdapter extends ReplicationStrategy<InetAddress> http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index e61a35a..5fa402a 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -584,7 +584,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean JVMStabilityInspector.inspectThrowable(th); // TODO this is broken logger.warn("Unable to calculate tokens for {}. Will use a random one", address); - tokens = Collections.singletonList(StorageService.getPartitioner().getRandomToken()); + tokens = Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken()); } int generation = epState.getHeartBeatState().getGeneration(); int heartbeat = epState.getHeartBeatState().getHeartBeatVersion(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index acc9141..f4b4da8 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.io.Closeable; +import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -30,7 +31,6 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.Pair; @@ -46,12 +46,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat(); protected static AtomicInteger generation = new AtomicInteger(0); - protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns) + protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns) { this.metadata = metadata; this.directory = directory; this.columns = columns; - DatabaseDescriptor.setPartitioner(partitioner); } protected void setSSTableFormatType(SSTableFormat.Type type) @@ -103,6 +102,11 @@ abstract class AbstractSSTableSimpleWriter implements Closeable return maxGen; } + PartitionUpdate getUpdateFor(ByteBuffer key) throws IOException + { + return getUpdateFor(metadata.decorateKey(key)); + } + /** * Returns a PartitionUpdate suitable to write on this writer for the provided key. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 43e214b..7ae5651 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -77,6 +77,9 @@ public class CQLSSTableWriter implements Closeable static { Config.setClientMode(true); + // Partitioner is not set in client mode. + if (DatabaseDescriptor.getPartitioner() == null) + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); } private final AbstractSSTableSimpleWriter writer; @@ -219,10 +222,7 @@ public class CQLSSTableWriter implements Closeable try { for (ByteBuffer key : keys) - { - DecoratedKey dk = DatabaseDescriptor.getPartitioner().decorateKey(key); - insert.addUpdateForKey(writer.getUpdateFor(dk), clustering, params); - } + insert.addUpdateForKey(writer.getUpdateFor(key), clustering, params); return this; } catch (SSTableSimpleUnsortedWriter.SyncException e) @@ -277,7 +277,6 @@ public class CQLSSTableWriter implements Closeable public static class Builder { private File directory; - private IPartitioner partitioner = Murmur3Partitioner.instance; protected SSTableFormat.Type formatType = null; @@ -402,7 +401,7 @@ public class CQLSSTableWriter implements Closeable */ public Builder withPartitioner(IPartitioner partitioner) { - this.partitioner = partitioner; + this.schema = schema.copy(partitioner); return this; } @@ -511,8 +510,8 @@ public class CQLSSTableWriter implements Closeable throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()"); AbstractSSTableSimpleWriter writer = sorted - ? new SSTableSimpleWriter(directory, schema, partitioner, insert.updatedColumns()) - : new SSTableSimpleUnsortedWriter(directory, schema, partitioner, insert.updatedColumns(), bufferSizeInMB); + ? new SSTableSimpleWriter(directory, schema, insert.updatedColumns()) + : new SSTableSimpleUnsortedWriter(directory, schema, insert.updatedColumns(), bufferSizeInMB); if (formatType != null) writer.setSSTableFormatType(formatType); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/KeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java index 5de2452..8fb300b 100644 --- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java @@ -23,8 +23,10 @@ import java.io.IOException; import com.google.common.collect.AbstractIterator; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -80,11 +82,13 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close } private final In in; + private final IPartitioner partitioner; - public KeyIterator(Descriptor desc) + public KeyIterator(Descriptor desc, CFMetaData metadata) { in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX))); + partitioner = metadata.partitioner; } protected DecoratedKey computeNext() @@ -94,7 +98,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close if (in.isEOF()) return endOfData(); - DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in.get())); + DecoratedKey key = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get())); RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the entry return key; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java index f1e01f2..bbc56cc 100644 --- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java @@ -40,7 +40,7 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey> { iters = new ArrayList<>(sstables.size()); for (SSTableReader sstable : sstables) - iters.add(new KeyIterator(sstable.descriptor)); + iters.add(new KeyIterator(sstable.descriptor, sstable.metadata)); } private void maybeInit() http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/SSTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java index 516534d..b86d9b4 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTable.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java @@ -18,6 +18,7 @@ package org.apache.cassandra.io.sstable; import java.io.*; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.CopyOnWriteArraySet; @@ -63,31 +64,29 @@ public abstract class SSTable public final Descriptor descriptor; protected final Set<Component> components; public final CFMetaData metadata; - public final IPartitioner partitioner; public final boolean compression; public DecoratedKey first; public DecoratedKey last; - protected SSTable(Descriptor descriptor, CFMetaData metadata, IPartitioner partitioner) + protected SSTable(Descriptor descriptor, CFMetaData metadata) { - this(descriptor, new HashSet<>(), metadata, partitioner); + this(descriptor, new HashSet<>(), metadata); } - protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) + protected SSTable(Descriptor descriptor, Set<Component> components, CFMetaData metadata) { // In almost all cases, metadata shouldn't be null, but allowing null allows to create a mostly functional SSTable without // full schema definition. SSTableLoader use that ability assert descriptor != null; assert components != null; - assert partitioner != null; + assert metadata != null; this.descriptor = descriptor; Set<Component> dataComponents = new HashSet<>(components); this.compression = dataComponents.contains(Component.COMPRESSION_INFO); this.components = new CopyOnWriteArraySet<>(dataComponents); this.metadata = metadata; - this.partitioner = partitioner; } /** @@ -121,6 +120,16 @@ public abstract class SSTable return true; } + public IPartitioner getPartitioner() + { + return metadata.partitioner; + } + + public DecoratedKey decorateKey(ByteBuffer key) + { + return getPartitioner().decorateKey(key); + } + /** * If the given @param key occupies only part of a larger buffer, allocate a new buffer that is only * as large as necessary. http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index f25d3ff..20c3962 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -133,7 +133,7 @@ public class SSTableLoader implements StreamEventHandler // To conserve memory, open SSTableReaders without bloom filters and discard // the index summary after calculating the file sections to stream and the estimated // number of keys for each endpoint. See CASSANDRA-5555 for details. - SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata, client.getPartitioner()); + SSTableReader sstable = SSTableReader.openForBatch(desc, components, metadata); sstables.add(sstable); // calculate the sstable sections to stream as well as the estimated number of @@ -252,7 +252,6 @@ public class SSTableLoader implements StreamEventHandler public static abstract class Client { private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<>(); - private IPartitioner partitioner; /** * Initialize the client. @@ -299,23 +298,6 @@ public class SSTableLoader implements StreamEventHandler return endpointToRanges; } - protected void setPartitioner(String partclass) throws ConfigurationException - { - setPartitioner(FBUtilities.newPartitioner(partclass)); - } - - protected void setPartitioner(IPartitioner partitioner) - { - this.partitioner = partitioner; - // the following is still necessary since Range/Token reference partitioner through StorageService.getPartitioner - DatabaseDescriptor.setPartitioner(partitioner); - } - - public IPartitioner getPartitioner() - { - return partitioner; - } - protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint) { Collection<Range<Token>> ranges = endpointToRanges.get(endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index a70b92f..f4b9adf 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -33,7 +33,6 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredSerializer; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.utils.JVMStabilityInspector; /** @@ -60,9 +59,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>(); private final DiskWriter diskWriter = new DiskWriter(); - SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB) + SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, PartitionColumns columns, long bufferSizeInMB) { - super(directory, metadata, partitioner, columns); + super(directory, metadata, columns); this.bufferSize = bufferSizeInMB * 1024L * 1024L; this.header = new SerializationHeader(metadata, columns, EncodingStats.NO_STATS); diskWriter.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java index b22a048..45722cd 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java @@ -44,9 +44,9 @@ class SSTableSimpleWriter extends AbstractSSTableSimpleWriter private SSTableTxnWriter writer; - protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns) + protected SSTableSimpleWriter(File directory, CFMetaData metadata, PartitionColumns columns) { - super(directory, metadata, partitioner, columns); + super(directory, metadata, columns); } private SSTableTxnWriter getOrCreateWriter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 6d39d2d..5ceced5 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -27,10 +27,11 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; @@ -42,27 +43,23 @@ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.lifecycle.TransactionLogs; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.*; -import org.apache.cassandra.io.sstable.format.big.BigTableWriter; import org.apache.cassandra.io.sstable.metadata.*; import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.SelfRefCounted; @@ -351,21 +348,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException { - IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR) - ? new LocalPartitioner(metadata.getKeyValidator()) - : StorageService.getPartitioner(); - return open(desc, componentsFor(desc), metadata, p); + return open(desc, componentsFor(desc), metadata); } - public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException + public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException { - return open(descriptor, components, metadata, partitioner, true, true); + return open(descriptor, components, metadata, true, true); } // use only for offline or "Standalone" operations public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, ColumnFamilyStore cfs) throws IOException { - return open(descriptor, components, cfs.metadata, cfs.partitioner, false, false); // do not track hotness + return open(descriptor, components, cfs.metadata, false, false); // do not track hotness } /** @@ -374,11 +368,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS * @param descriptor * @param components * @param metadata - * @param partitioner * @return opened SSTableReader * @throws IOException */ - public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException + public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException { // Minimum components without which we can't do anything assert components.contains(Component.DATA) : "Data component is missing for sstable " + descriptor; @@ -394,7 +387,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. // In that case, we skip the check. - String partitionerName = partitioner.getClass().getCanonicalName(); + String partitionerName = metadata.partitioner.getClass().getCanonicalName(); if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) { logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", @@ -406,7 +399,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader sstable = internalOpen(descriptor, components, metadata, - partitioner, System.currentTimeMillis(), statsMetadata, OpenReason.NORMAL, @@ -431,7 +423,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, - IPartitioner partitioner, boolean validate, boolean trackHotness) throws IOException { @@ -452,7 +443,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // Check if sstable is created using same partitioner. // Partitioner can be null, which indicates older version of sstable or no stats available. // In that case, we skip the check. - String partitionerName = partitioner.getClass().getCanonicalName(); + String partitionerName = metadata.partitioner.getClass().getCanonicalName(); if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner)) { logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s. Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.", @@ -464,7 +455,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader sstable = internalOpen(descriptor, components, metadata, - partitioner, System.currentTimeMillis(), statsMetadata, OpenReason.NORMAL, @@ -502,8 +492,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries, - final CFMetaData metadata, - final IPartitioner partitioner) + final CFMetaData metadata) { final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>(); @@ -517,7 +506,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader sstable; try { - sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner); + sstable = open(entry.getKey(), entry.getValue(), metadata); } catch (CorruptSSTableException ex) { @@ -562,7 +551,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public static SSTableReader internalOpen(Descriptor desc, Set<Component> components, CFMetaData metadata, - IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, @@ -572,9 +560,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS OpenReason openReason, SerializationHeader header) { - assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null; + assert desc != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null; - SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); + SSTableReader reader = internalOpen(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header); reader.bf = bf; reader.ifile = ifile; @@ -589,7 +577,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS private static SSTableReader internalOpen(final Descriptor descriptor, Set<Component> components, CFMetaData metadata, - IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, @@ -597,19 +584,18 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { Factory readerFactory = descriptor.getFormat().getReaderFactory(); - return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); + return readerFactory.open(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header); } protected SSTableReader(final Descriptor desc, Set<Component> components, CFMetaData metadata, - IPartitioner partitioner, long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header) { - super(desc, components, metadata, partitioner); + super(desc, components, metadata); this.sstableMetadata = sstableMetadata; this.header = header; this.maxDataAge = maxDataAge; @@ -814,7 +800,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex); RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex); - DecoratedKey decoratedKey = partitioner.decorateKey(key); + DecoratedKey decoratedKey = decorateKey(key); if (first == null) first = decoratedKey; last = decoratedKey; @@ -832,7 +818,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS } if (!summaryLoaded) - indexSummary = summaryBuilder.build(partitioner); + indexSummary = summaryBuilder.build(getPartitioner()); } } @@ -862,10 +848,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { iStream = new DataInputStream(new FileInputStream(summariesFile)); indexSummary = IndexSummary.serializer.deserialize( - iStream, partitioner, descriptor.version.hasSamplingLevel(), + iStream, getPartitioner(), descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); - first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); - last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream)); + first = decorateKey(ByteBufferUtil.readWithLength(iStream)); + last = decorateKey(ByteBufferUtil.readWithLength(iStream)); ibuilder.deserializeBounds(iStream); dbuilder.deserializeBounds(iStream); } @@ -1064,7 +1050,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SSTableReader replacement = internalOpen(descriptor, components, metadata, - partitioner, ifile != null ? ifile.sharedCopy() : null, dfile.sharedCopy(), newSummary, @@ -1168,7 +1153,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS else if (samplingLevel < indexSummary.getSamplingLevel()) { // we can use the existing index summary to make a smaller one - newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner); + newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, getPartitioner()); try(SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), compression)) @@ -1203,11 +1188,11 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS long indexPosition; while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) { - summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition); + summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition); RowIndexEntry.Serializer.skip(primaryIndex); } - return summaryBuilder.build(partitioner); + return summaryBuilder.build(getPartitioner()); } } finally @@ -1304,8 +1289,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata(); - //We need the parent cf metadata - String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName; + // We need the parent cf metadata + String cfName = metadata.isIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName; cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName)); return cmd; @@ -1477,7 +1462,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public DecoratedKey next() { byte[] bytes = indexSummary.getKey(idx++); - return partitioner.decorateKey(ByteBuffer.wrap(bytes)); + return decorateKey(ByteBuffer.wrap(bytes)); } public void remove() @@ -1619,7 +1604,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS while (!in.isEOF()) { ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in); - DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); + DecoratedKey indexDecoratedKey = decorateKey(indexKey); if (indexDecoratedKey.compareTo(token) > 0) return indexDecoratedKey; @@ -2300,7 +2285,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public abstract SSTableReader open(final Descriptor descriptor, Set<Component> components, CFMetaData metadata, - IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 08a9dcc..fa691b8 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -27,13 +27,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.rows.UnfilteredRowIterator; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTable; @@ -74,11 +72,10 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, CFMetaData metadata, - IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header) { - super(descriptor, components(metadata), metadata, partitioner); + super(descriptor, components(metadata), metadata); this.keyCount = keyCount; this.repairedAt = repairedAt; this.metadataCollector = metadataCollector; @@ -90,19 +87,18 @@ public abstract class SSTableWriter extends SSTable implements Transactional Long keyCount, Long repairedAt, CFMetaData metadata, - IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) { Factory writerFactory = descriptor.getFormat().getWriterFactory(); - return writerFactory.open(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn); + return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); } public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn) { CFMetaData metadata = Schema.instance.getCFMetaData(descriptor); - return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, DatabaseDescriptor.getPartitioner(), header, txn); + return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn); } public static SSTableWriter create(CFMetaData metadata, @@ -110,12 +106,11 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, int sstableLevel, - IPartitioner partitioner, SerializationHeader header, LifecycleTransaction txn) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); - return create(descriptor, keyCount, repairedAt, metadata, partitioner, collector, header, txn); + return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn); } public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn) @@ -255,7 +250,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected Map<MetadataType, MetadataComponent> finalizeMetadata() { - return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(), + return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(), metadata.getBloomFilterFpChance(), repairedAt, header); @@ -287,7 +282,6 @@ public abstract class SSTableWriter extends SSTable implements Transactional long keyCount, long repairedAt, CFMetaData metadata, - IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index a072d4d..860cd9f 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -23,7 +23,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableFormat; @@ -86,21 +85,20 @@ public class BigFormat implements SSTableFormat long keyCount, long repairedAt, CFMetaData metadata, - IPartitioner partitioner, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn) { - return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, partitioner, metadataCollector, header, txn); + return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn); } } static class ReaderFactory extends SSTableReader.Factory { @Override - public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header) + public SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, SSTableReader.OpenReason openReason, SerializationHeader header) { - return new BigTableReader(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); + return new BigTableReader(descriptor, components, metadata, maxDataAge, sstableMetadata, openReason, header); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index b539c79..87608fd 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -52,9 +52,9 @@ public class BigTableReader extends SSTableReader { private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class); - BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, IPartitioner partitioner, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header) + BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header) { - super(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason, header); + super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header); } public SliceableUnfilteredRowIterator iterator(DecoratedKey key, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift) @@ -201,7 +201,7 @@ public class BigTableReader extends SSTableReader } else { - DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey); + DecoratedKey indexDecoratedKey = decorateKey(indexKey); int comparison = indexDecoratedKey.compareTo(key); int v = op.apply(comparison); opSatisfied = (v == 0); @@ -227,7 +227,7 @@ public class BigTableReader extends SSTableReader // expensive sanity check! see CASSANDRA-4687 try (FileDataInput fdi = dfile.getSegment(indexEntry.position)) { - DecoratedKey keyInDisk = partitioner.decorateKey(ByteBufferUtil.readWithShortLength(fdi)); + DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi)); if (!keyInDisk.equals(key)) throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath())); }
