Stop accessing the partitioner directly via StorageService Changes the way the partitioner is accessed. Partitioner is now stored and accessed according to the reason for needing it: * if it is to access a table / prepare a statement, the partitioner relevant to this table can be found in its CFMetaData. * if it is to route messages to the relevant member of the cluster, the partitioner in TokenMetadata is to be used.
patch by Branimir Lambov; reviewed by Aleksey Yeschenko for CASSANDRA-8143 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/69f77cbd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/69f77cbd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/69f77cbd Branch: refs/heads/trunk Commit: 69f77cbddd4c74448f227e9aceef84d345118184 Parents: df3b602 Author: Branimir Lambov <branimir.lam...@datastax.com> Authored: Fri Jul 3 14:38:40 2015 +0100 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Jul 31 14:42:47 2015 +0300 ---------------------------------------------------------------------- .../org/apache/cassandra/config/CFMetaData.java | 64 +++++++- .../cassandra/config/DatabaseDescriptor.java | 7 +- .../org/apache/cassandra/config/Schema.java | 2 +- .../apache/cassandra/cql3/TokenRelation.java | 7 +- .../cassandra/cql3/functions/TokenFct.java | 10 +- .../restrictions/StatementRestrictions.java | 2 +- .../cql3/restrictions/TokenFilter.java | 14 +- .../cql3/restrictions/TokenRestriction.java | 23 +-- .../cql3/statements/BatchStatement.java | 5 +- .../cql3/statements/ModificationStatement.java | 7 +- .../cql3/statements/SelectStatement.java | 2 +- .../db/AbstractReadCommandBuilder.java | 11 +- .../apache/cassandra/db/BatchlogManager.java | 7 +- .../apache/cassandra/db/ColumnFamilyStore.java | 35 ++-- src/java/org/apache/cassandra/db/DataRange.java | 3 +- .../cassandra/db/HintedHandOffManager.java | 29 ++-- src/java/org/apache/cassandra/db/Memtable.java | 1 - src/java/org/apache/cassandra/db/Mutation.java | 26 ++- .../apache/cassandra/db/PartitionPosition.java | 2 +- .../cassandra/db/PartitionRangeReadCommand.java | 2 +- .../apache/cassandra/db/RowUpdateBuilder.java | 2 +- .../db/SinglePartitionNamesCommand.java | 12 ++ .../db/SinglePartitionReadCommand.java | 18 ++- .../db/SinglePartitionSliceCommand.java | 17 ++ .../org/apache/cassandra/db/SystemKeyspace.java | 18 +-- .../db/compaction/CompactionManager.java | 4 +- .../db/compaction/LeveledManifest.java | 2 +- .../cassandra/db/compaction/Scrubber.java | 4 +- .../cassandra/db/compaction/Upgrader.java | 1 - .../cassandra/db/compaction/Verifier.java | 2 +- .../writers/DefaultCompactionWriter.java | 1 - .../writers/MajorLeveledCompactionWriter.java | 2 - .../writers/MaxSSTableSizeWriter.java | 2 - .../SplittingSizeTieredCompactionWriter.java | 2 - .../AbstractSimplePerColumnSecondaryIndex.java | 4 +- .../cassandra/db/index/SecondaryIndex.java | 20 +-- .../db/index/composites/CompositesIndex.java | 2 +- .../CompositesIndexOnClusteringKey.java | 3 +- .../db/index/composites/CompositesSearcher.java | 2 +- .../cassandra/db/index/keys/KeysIndex.java | 3 +- .../cassandra/db/index/keys/KeysSearcher.java | 2 +- .../db/marshal/LocalByPartionerType.java | 97 ------------ .../db/marshal/PartitionerDefinedOrder.java | 91 +++++++++++ .../db/partitions/AtomicBTreePartition.java | 4 +- .../db/partitions/PartitionUpdate.java | 108 ++++++++++--- .../rows/UnfilteredRowIteratorSerializer.java | 3 +- .../cassandra/db/view/MaterializedView.java | 7 +- .../apache/cassandra/db/view/TemporalRow.java | 2 +- .../org/apache/cassandra/dht/BootStrapper.java | 12 +- .../cassandra/dht/ByteOrderedPartitioner.java | 5 + .../org/apache/cassandra/dht/IPartitioner.java | 6 + .../apache/cassandra/dht/LocalPartitioner.java | 5 + .../cassandra/dht/Murmur3Partitioner.java | 7 + .../dht/OrderPreservingPartitioner.java | 5 + .../apache/cassandra/dht/RandomPartitioner.java | 7 + .../org/apache/cassandra/dht/RangeStreamer.java | 2 +- .../dht/tokenallocator/TokenAllocation.java | 8 +- src/java/org/apache/cassandra/gms/Gossiper.java | 2 +- .../io/sstable/AbstractSSTableSimpleWriter.java | 10 +- .../cassandra/io/sstable/CQLSSTableWriter.java | 15 +- .../cassandra/io/sstable/KeyIterator.java | 8 +- .../io/sstable/ReducingKeyIterator.java | 2 +- .../apache/cassandra/io/sstable/SSTable.java | 21 ++- .../cassandra/io/sstable/SSTableLoader.java | 20 +-- .../io/sstable/SSTableSimpleUnsortedWriter.java | 5 +- .../io/sstable/SSTableSimpleWriter.java | 4 +- .../io/sstable/format/SSTableReader.java | 74 ++++----- .../io/sstable/format/SSTableWriter.java | 16 +- .../io/sstable/format/big/BigFormat.java | 8 +- .../io/sstable/format/big/BigTableReader.java | 8 +- .../io/sstable/format/big/BigTableScanner.java | 6 +- .../io/sstable/format/big/BigTableWriter.java | 15 +- .../apache/cassandra/locator/TokenMetadata.java | 32 +++- .../apache/cassandra/net/MessagingService.java | 6 +- .../repair/RepairMessageVerbHandler.java | 4 +- .../cassandra/schema/LegacySchemaMigrator.java | 16 +- .../apache/cassandra/schema/SchemaKeyspace.java | 33 ++-- .../apache/cassandra/service/CacheService.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 11 +- .../cassandra/service/StorageService.java | 57 ++++--- .../service/pager/RangeNamesQueryPager.java | 4 +- .../service/pager/RangeSliceQueryPager.java | 3 +- .../apache/cassandra/service/paxos/Commit.java | 5 +- .../cassandra/streaming/StreamReader.java | 2 +- .../cassandra/thrift/CassandraServer.java | 48 +++--- .../cassandra/thrift/ThriftConversion.java | 4 +- .../cassandra/thrift/ThriftValidation.java | 3 +- .../utils/NativeSSTableLoaderClient.java | 17 +- .../io/sstable/CQLSSTableWriterLongTest.java | 1 - test/unit/org/apache/cassandra/MockSchema.java | 9 +- .../org/apache/cassandra/UpdateBuilder.java | 2 +- test/unit/org/apache/cassandra/Util.java | 49 ++++-- .../apache/cassandra/config/CFMetaDataTest.java | 1 - .../cassandra/cql3/IndexQueryPagingTest.java | 3 - .../selection/SelectionColumnMappingTest.java | 2 +- .../entities/FrozenCollectionsTest.java | 5 +- .../cql3/validation/entities/JsonTest.java | 2 +- .../SecondaryIndexOnMapEntriesTest.java | 2 +- .../cql3/validation/entities/UserTypesTest.java | 5 +- .../validation/operations/SelectLimitTest.java | 2 +- .../SelectOrderedPartitionerTest.java | 2 +- .../cassandra/db/BatchlogManagerTest.java | 7 +- .../org/apache/cassandra/db/RowCacheTest.java | 4 +- .../unit/org/apache/cassandra/db/ScrubTest.java | 63 ++------ .../org/apache/cassandra/db/VerifyTest.java | 4 +- .../cassandra/db/compaction/TTLExpiryTest.java | 2 +- .../db/lifecycle/RealTransactionsTest.java | 2 - .../db/lifecycle/TransactionLogsTest.java | 4 +- .../apache/cassandra/dht/BootStrapperTest.java | 11 +- .../apache/cassandra/dht/KeyCollisionTest.java | 134 +--------------- .../apache/cassandra/dht/LengthPartitioner.java | 158 +++++++++++++++++++ .../cassandra/dht/PartitionerTestCase.java | 2 +- .../cassandra/gms/SerializationsTest.java | 12 +- .../io/sstable/BigTableWriterTest.java | 1 - .../io/sstable/CQLSSTableWriterTest.java | 130 ++++++++------- .../cassandra/io/sstable/IndexSummaryTest.java | 15 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 +- .../cassandra/io/sstable/SSTableLoaderTest.java | 3 - .../cassandra/io/sstable/SSTableReaderTest.java | 32 ++-- .../io/sstable/SSTableRewriterTest.java | 2 +- .../apache/cassandra/repair/ValidatorTest.java | 9 +- .../service/ActiveRepairServiceTest.java | 6 +- .../service/LeaveAndBootstrapTest.java | 8 +- .../cassandra/service/SerializationsTest.java | 29 +++- .../cassandra/service/StorageProxyTest.java | 4 +- .../streaming/StreamingTransferTest.java | 12 +- .../apache/cassandra/utils/MerkleTreeTest.java | 2 +- .../cassandra/utils/SerializationsTest.java | 24 ++- 128 files changed, 1103 insertions(+), 878 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 902b1d2..ffb7b5e 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -45,6 +45,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.*; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.io.compress.CompressionParameters; import org.apache.cassandra.io.compress.LZ4Compressor; @@ -183,7 +184,10 @@ public final class CFMetaData private final boolean isCounter; private final boolean isMaterializedView; + private final boolean isIndex; + public volatile ClusteringComparator comparator; // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns + public final IPartitioner partitioner; // partitioner the table uses private final Serializers serializers; @@ -259,7 +263,8 @@ public final class CFMetaData boolean isMaterializedView, List<ColumnDefinition> partitionKeyColumns, List<ColumnDefinition> clusteringColumns, - PartitionColumns partitionColumns) + PartitionColumns partitionColumns, + IPartitioner partitioner) { this.cfId = cfId; this.ksName = keyspace; @@ -284,6 +289,11 @@ public final class CFMetaData flags.add(Flag.MATERIALIZEDVIEW); this.flags = Sets.immutableEnumSet(flags); + isIndex = cfName.contains("."); + + assert partitioner != null; + this.partitioner = partitioner; + // A compact table should always have a clustering assert isCQLTable() || !clusteringColumns.isEmpty() : String.format("For table %s.%s, isDense=%b, isCompound=%b, clustering=%s", ksName, cfName, isDense, isCompound, clusteringColumns); @@ -329,7 +339,8 @@ public final class CFMetaData boolean isSuper, boolean isCounter, boolean isMaterializedView, - List<ColumnDefinition> columns) + List<ColumnDefinition> columns, + IPartitioner partitioner) { List<ColumnDefinition> partitions = new ArrayList<>(); List<ColumnDefinition> clusterings = new ArrayList<>(); @@ -364,7 +375,8 @@ public final class CFMetaData isMaterializedView, partitions, clusterings, - builder.build()); + builder.build(), + partitioner); } private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns) @@ -466,7 +478,25 @@ public final class CFMetaData isMaterializedView(), copy(partitionKeyColumns), copy(clusteringColumns), - copy(partitionColumns)), + copy(partitionColumns), + partitioner), + this); + } + + public CFMetaData copy(IPartitioner partitioner) + { + return copyOpts(new CFMetaData(ksName, + cfName, + cfId, + isSuper, + isCounter, + isDense, + isCompound, + isMaterializedView, + copy(partitionKeyColumns), + copy(clusteringColumns), + copy(partitionColumns), + partitioner), this); } @@ -537,6 +567,19 @@ public final class CFMetaData return cfName.contains("."); } + /** + * true if this CFS contains secondary index data. + */ + public boolean isIndex() + { + return isIndex; + } + + public DecoratedKey decorateKey(ByteBuffer key) + { + return partitioner.decorateKey(key); + } + public Map<ByteBuffer, ColumnDefinition> getColumnMetadata() { return columnMetadata; @@ -548,7 +591,7 @@ public final class CFMetaData */ public String getParentColumnFamilyName() { - return isSecondaryIndex() ? cfName.substring(0, cfName.indexOf('.')) : null; + return isIndex ? cfName.substring(0, cfName.indexOf('.')) : null; } public double getReadRepairChance() @@ -1392,6 +1435,7 @@ public final class CFMetaData private final boolean isSuper; private final boolean isCounter; private final boolean isMaterializedView; + private IPartitioner partitioner; private UUID tableId; @@ -1409,6 +1453,7 @@ public final class CFMetaData this.isSuper = isSuper; this.isCounter = isCounter; this.isMaterializedView = isMaterializedView; + this.partitioner = DatabaseDescriptor.getPartitioner(); } public static Builder create(String keyspace, String table) @@ -1441,6 +1486,12 @@ public final class CFMetaData return create(keyspace, table, false, false, true, isCounter); } + public Builder withPartitioner(IPartitioner partitioner) + { + this.partitioner = partitioner; + return this; + } + public Builder withId(UUID tableId) { this.tableId = tableId; @@ -1554,7 +1605,8 @@ public final class CFMetaData isMaterializedView, partitions, clusterings, - builder.build()); + builder.build(), + partitioner); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index d32af4d..3ec21d7 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -18,7 +18,6 @@ package org.apache.cassandra.config; import java.io.File; -import java.io.IOException; import java.net.*; import java.util.*; @@ -743,10 +742,12 @@ public class DatabaseDescriptor return paritionerName; } - /* For tests ONLY, don't use otherwise or all hell will break loose */ - public static void setPartitioner(IPartitioner newPartitioner) + /* For tests ONLY, don't use otherwise or all hell will break loose. Tests should restore value at the end. */ + public static IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner) { + IPartitioner old = partitioner; partitioner = newPartitioner; + return old; } public static IEndpointSnitch getEndpointSnitch() http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index c934327..e1e7380 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -612,7 +612,7 @@ public class Schema MigrationManager.instance.notifyDropAggregate(uda); } - private KeyspaceMetadata update(String keyspaceName, java.util.function.Function<KeyspaceMetadata, KeyspaceMetadata> transformation) + private synchronized KeyspaceMetadata update(String keyspaceName, java.util.function.Function<KeyspaceMetadata, KeyspaceMetadata> transformation) { KeyspaceMetadata current = getKSMetaData(keyspaceName); if (current == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/TokenRelation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/TokenRelation.java b/src/java/org/apache/cassandra/cql3/TokenRelation.java index 14bd5e0..e0b71fa 100644 --- a/src/java/org/apache/cassandra/cql3/TokenRelation.java +++ b/src/java/org/apache/cassandra/cql3/TokenRelation.java @@ -30,7 +30,6 @@ import org.apache.cassandra.cql3.restrictions.Restriction; import org.apache.cassandra.cql3.restrictions.TokenRestriction; import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsNoDuplicates; import static org.apache.cassandra.cql3.statements.RequestValidations.checkContainsOnly; @@ -69,7 +68,7 @@ public final class TokenRelation extends Relation { List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm); Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames); - return new TokenRestriction.EQRestriction(cfm.getKeyValidatorAsClusteringComparator(), columnDefs, term); + return new TokenRestriction.EQRestriction(cfm, columnDefs, term); } @Override @@ -86,7 +85,7 @@ public final class TokenRelation extends Relation { List<ColumnDefinition> columnDefs = getColumnDefinitions(cfm); Term term = toTerm(toReceivers(cfm, columnDefs), value, cfm.ksName, boundNames); - return new TokenRestriction.SliceRestriction(cfm.getKeyValidatorAsClusteringComparator(), columnDefs, bound, inclusive, term); + return new TokenRestriction.SliceRestriction(cfm, columnDefs, bound, inclusive, term); } @Override @@ -159,6 +158,6 @@ public final class TokenRelation extends Relation return Collections.singletonList(new ColumnSpecification(firstColumn.ksName, firstColumn.cfName, new ColumnIdentifier("partition key token", true), - StorageService.getPartitioner().getTokenValidator())); + cfm.partitioner.getTokenValidator())); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/functions/TokenFct.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java index c76b588..283ac0b 100644 --- a/src/java/org/apache/cassandra/cql3/functions/TokenFct.java +++ b/src/java/org/apache/cassandra/cql3/functions/TokenFct.java @@ -22,23 +22,17 @@ import java.util.List; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.CBuilder; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.service.StorageService; public class TokenFct extends NativeScalarFunction { - // The actual token function depends on the partitioner used - private static final IPartitioner partitioner = StorageService.getPartitioner(); - private final CFMetaData cfm; public TokenFct(CFMetaData cfm) { - super("token", partitioner.getTokenValidator(), getKeyTypes(cfm)); + super("token", cfm.partitioner.getTokenValidator(), getKeyTypes(cfm)); this.cfm = cfm; } @@ -61,6 +55,6 @@ public class TokenFct extends NativeScalarFunction return null; builder.add(bb); } - return partitioner.getTokenFactory().toByteArray(partitioner.getToken(CFMetaData.serializePartitionKey(builder.build()))); + return cfm.partitioner.getTokenFactory().toByteArray(cfm.partitioner.getToken(CFMetaData.serializePartitionKey(builder.build()))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index d9fd5e4..ea87db7 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@ -378,7 +378,7 @@ public final class StatementRestrictions */ public AbstractBounds<PartitionPosition> getPartitionKeyBounds(QueryOptions options) throws InvalidRequestException { - IPartitioner p = StorageService.getPartitioner(); + IPartitioner p = cfm.partitioner; if (partitionKeyRestrictions.isOnToken()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java index bf3f2f6..3258b26 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java @@ -31,7 +31,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.cql3.statements.Bound.END; import static org.apache.cassandra.cql3.statements.Bound.START; @@ -52,9 +51,9 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions private TokenRestriction tokenRestriction; /** - * The partitioner + * Partitioner to manage tokens, extracted from tokenRestriction metadata. */ - private static final IPartitioner partitioner = StorageService.getPartitioner(); + private final IPartitioner partitioner; @Override protected PrimaryKeyRestrictions getDelegate() @@ -74,6 +73,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions { this.restrictions = restrictions; this.tokenRestriction = tokenRestriction; + this.partitioner = tokenRestriction.metadata.partitioner; } @Override @@ -144,7 +144,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @param values the restricted values * @return the values for which the tokens are not included within the specified range. */ - private static List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values) + private List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values) { List<ByteBuffer> remaining = new ArrayList<>(); @@ -166,7 +166,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @param buffers the token restriction values * @return the range set corresponding to the specified list */ - private static RangeSet<Token> toRangeSet(List<ByteBuffer> buffers) + private RangeSet<Token> toRangeSet(List<ByteBuffer> buffers) { ImmutableRangeSet.Builder<Token> builder = ImmutableRangeSet.builder(); @@ -184,7 +184,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @return the range set corresponding to the specified slice * @throws InvalidRequestException if the request is invalid */ - private static RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException + private RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException { if (slice.hasBound(START)) { @@ -224,7 +224,7 @@ final class TokenFilter extends ForwardingPrimaryKeyRestrictions * @param buffer the buffer * @return the token corresponding to the specified buffer */ - private static Token deserializeToken(ByteBuffer buffer) + private Token deserializeToken(ByteBuffer buffer) { return partitioner.getTokenFactory().fromByteArray(buffer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java index 0a7721a..56da6da 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java @@ -22,6 +22,7 @@ import java.util.*; import com.google.common.base.Joiner; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.Term; @@ -44,16 +45,18 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions */ protected final List<ColumnDefinition> columnDefs; + final CFMetaData metadata; + /** * Creates a new <code>TokenRestriction</code> that apply to the specified columns. * - * @param comparator the clustering comparator * @param columnDefs the definition of the columns to which apply the token restriction */ - public TokenRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs) + public TokenRestriction(CFMetaData metadata, List<ColumnDefinition> columnDefs) { - super(comparator); + super(metadata.getKeyValidatorAsClusteringComparator()); this.columnDefs = columnDefs; + this.metadata = metadata; } @Override @@ -154,9 +157,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions { private final Term value; - public EQRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Term value) + public EQRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, Term value) { - super(comparator, columnDefs); + super(cfm, columnDefs); this.value = value; } @@ -190,9 +193,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions { private final TermSlice slice; - public SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term) + public SliceRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, Bound bound, boolean inclusive, Term term) { - super(comparator, columnDefs); + super(cfm, columnDefs); slice = TermSlice.newInstance(bound, inclusive, term); } @@ -250,7 +253,7 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions throw invalidRequest("More than one restriction was found for the end bound on %s", getColumnNamesAsString()); - return new SliceRestriction(comparator, columnDefs, slice.merge(otherSlice.slice)); + return new SliceRestriction(metadata, columnDefs, slice.merge(otherSlice.slice)); } @Override @@ -258,9 +261,9 @@ public abstract class TokenRestriction extends AbstractPrimaryKeyRestrictions { return String.format("SLICE%s", slice); } - private SliceRestriction(ClusteringComparator comparator, List<ColumnDefinition> columnDefs, TermSlice slice) + private SliceRestriction(CFMetaData cfm, List<ColumnDefinition> columnDefs, TermSlice slice) { - super(comparator, columnDefs); + super(cfm, columnDefs); this.slice = slice; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 08a47c0..5d1333c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -39,7 +39,6 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.thrift.Column; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.transport.messages.ResultMessage; @@ -260,7 +259,7 @@ public class BatchStatement implements CQLStatement for (ByteBuffer key : keys) { - DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + DecoratedKey dk = statement.cfm.decorateKey(key); IMutation mutation = ksMap.get(dk.getKey()); Mutation mut; if (mutation == null) @@ -426,7 +425,7 @@ public class BatchStatement implements CQLStatement throw new IllegalArgumentException("Batch with conditions cannot span multiple partitions (you cannot use IN on the partition key)"); if (key == null) { - key = StorageService.getPartitioner().decorateKey(pks.get(0)); + key = statement.cfm.decorateKey(pks.get(0)); casRequest = new CQL3CasRequest(statement.cfm, key, true, conditionColumns, updatesRegularRows, updatesStaticRow); } else if (!key.getKey().equals(pks.get(0))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 2f3de4c..9f2c952 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -541,7 +541,7 @@ public abstract class ModificationStatement implements CQLStatement ColumnFilter.selection(toRead), RowFilter.NONE, DataLimits.NONE, - StorageService.getPartitioner().decorateKey(key), + key, new ClusteringIndexNamesFilter(clusterings, false))); Map<DecoratedKey, Partition> map = new HashMap(); @@ -639,7 +639,7 @@ public abstract class ModificationStatement implements CQLStatement if (keys.size() > 1) throw new InvalidRequestException("IN on the partition key is not supported with conditional updates"); - DecoratedKey key = StorageService.getPartitioner().decorateKey(keys.get(0)); + DecoratedKey key = cfm.decorateKey(keys.get(0)); long now = options.getTimestamp(queryState); CBuilder cbuilder = createClustering(options); @@ -820,8 +820,7 @@ public abstract class ModificationStatement implements CQLStatement for (ByteBuffer key: keys) { ThriftValidation.validateKey(cfm, key); - DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); - PartitionUpdate upd = new PartitionUpdate(cfm, dk, updatedColumns(), 1); + PartitionUpdate upd = new PartitionUpdate(cfm, key, updatedColumns(), 1); addUpdateForKey(upd, clustering, params); Mutation mut = new Mutation(upd); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 84d621b..94f04b8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -410,7 +410,7 @@ public class SelectStatement implements CQLStatement for (ByteBuffer key : keys) { QueryProcessor.validateKey(key); - DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBufferUtil.clone(key)); + DecoratedKey dk = cfm.decorateKey(ByteBufferUtil.clone(key)); commands.add(SinglePartitionReadCommand.create(cfm, nowInSec, queriedColumns, rowFilter, limit, dk, filter)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java index 2ddc6ca..5e3b726 100644 --- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java +++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java @@ -24,14 +24,9 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.*; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; public abstract class AbstractReadCommandBuilder @@ -312,13 +307,13 @@ public abstract class AbstractReadCommandBuilder PartitionPosition start = startKey; if (start == null) { - start = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); + start = cfs.getPartitioner().getMinimumToken().maxKeyBound(); startInclusive = false; } PartitionPosition end = endKey; if (end == null) { - end = StorageService.getPartitioner().getMinimumToken().maxKeyBound(); + end = cfs.getPartitioner().getMinimumToken().maxKeyBound(); endInclusive = true; } @@ -341,7 +336,7 @@ public abstract class AbstractReadCommandBuilder return (DecoratedKey)partitionKey[0]; ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); - return StorageService.getPartitioner().decorateKey(key); + return metadata.decorateKey(key); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index 154a86b..9e90d9d 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -199,8 +199,11 @@ public class BatchlogManager implements BatchlogManagerMBean private void deleteBatch(UUID id) { - Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(id))); - mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); + Mutation mutation = new Mutation( + PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, + UUIDType.instance.decompose(id), + FBUtilities.timestampMicros(), + FBUtilities.nowInSeconds())); mutation.apply(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 24da365..c4377d6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -137,7 +137,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final Keyspace keyspace; public final String name; public final CFMetaData metadata; - public final IPartitioner partitioner; private final String mbeanName; @Deprecated private final String oldMBeanName; @@ -304,20 +303,18 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, - IPartitioner partitioner, int generation, CFMetaData metadata, Directories directories, boolean loadSSTables) { - this(keyspace, columnFamilyName, partitioner, generation, metadata, directories, loadSSTables, true); + this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true); } @VisibleForTesting public ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, - IPartitioner partitioner, int generation, CFMetaData metadata, Directories directories, @@ -331,7 +328,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean this.metadata = metadata; this.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold()); this.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold()); - this.partitioner = partitioner; this.directories = directories; this.indexManager = new SecondaryIndexManager(this); this.materializedViewManager = new MaterializedViewManager(this); @@ -349,7 +345,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (data.loadsstables) { Directories.SSTableLister sstableFiles = directories.sstableLister().skipTemporary(true); - Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata, this.partitioner); + Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata); data.addInitialSSTables(sstables); } @@ -486,12 +482,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, boolean loadSSTables) { - return createColumnFamilyStore(keyspace, columnFamily, StorageService.getPartitioner(), Schema.instance.getCFMetaData(keyspace.getName(), columnFamily), loadSSTables); + return createColumnFamilyStore(keyspace, columnFamily, Schema.instance.getCFMetaData(keyspace.getName(), columnFamily), loadSSTables); } public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, - IPartitioner partitioner, CFMetaData metadata, boolean loadSSTables) { @@ -510,7 +505,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean Collections.sort(generations); int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0; - return new ColumnFamilyStore(keyspace, columnFamily, partitioner, value, metadata, directories, loadSSTables); + return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, directories, loadSSTables); } /** @@ -681,7 +676,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean SSTableReader reader; try { - reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata, partitioner); + reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata); } catch (IOException e) { @@ -1443,7 +1438,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // WARNING: this returns the set of LIVE sstables only, which may be only partially written public List<String> getSSTablesForKey(String key) { - DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key)); + DecoratedKey dk = decorateKey(metadata.getKeyValidator().fromString(key)); try (OpOrder.Group op = readOrdering.start()) { List<String> files = new ArrayList<>(); @@ -1489,7 +1484,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean keyIter.hasNext(); ) { RowCacheKey key = keyIter.next(); - DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); + DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key)); if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) invalidateCachedPartition(dk); } @@ -1500,7 +1495,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean keyIter.hasNext(); ) { CounterCacheKey key = keyIter.next(); - DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); + DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey)); if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) CacheService.instance.counterCache.remove(key); } @@ -1618,7 +1613,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (logger.isDebugEnabled()) logger.debug("using snapshot sstable {}", entries.getKey()); // open without tracking hotness - sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, partitioner, true, false); + sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, true, false); // This is technically not necessary since it's a snapshot but makes things easier refs.tryRef(sstable); } @@ -2080,10 +2075,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return n; } + public IPartitioner getPartitioner() + { + return metadata.partitioner; + } + + public DecoratedKey decorateKey(ByteBuffer key) + { + return metadata.decorateKey(key); + } + /** true if this CFS contains secondary index data */ public boolean isIndex() { - return partitioner instanceof LocalPartitioner; + return metadata.isIndex(); } public Iterable<ColumnFamilyStore> concatWithIndexes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index 358b0ac..023f572 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -27,7 +27,6 @@ import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.dht.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; /** * Groups both the range of partitions to query, and the clustering index filter to @@ -374,7 +373,7 @@ public class DataRange public DataRange deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { - AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version); + AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); if (in.readBoolean()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 6ff880c..73189a6 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -33,9 +33,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.ColumnDefinition; @@ -46,7 +46,6 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UUIDType; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.FailureDetector; @@ -131,8 +130,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hintId = UUIDGen.getTimeUUID(); // serialize the hint with id and version as a composite column name - DecoratedKey key = StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)); - + ByteBuffer key = UUIDType.instance.decompose(targetId); Clustering clustering = SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version); ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version)); Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value); @@ -179,9 +177,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp) { - DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes); Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds()); - PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, BTreeBackedRow.singleCellRow(clustering, cell)); + PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, tokenBytes, BTreeBackedRow.singleCellRow(clustering, cell)); new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery } @@ -204,8 +201,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean if (!StorageService.instance.getTokenMetadata().isMember(endpoint)) return; UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint); - DecoratedKey dk = StorageService.getPartitioner().decorateKey(ByteBuffer.wrap(UUIDGen.decompose(hostId))); - final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, dk, System.currentTimeMillis(), FBUtilities.nowInSeconds())); + ByteBuffer key = ByteBuffer.wrap(UUIDGen.decompose(hostId)); + final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, key, System.currentTimeMillis(), FBUtilities.nowInSeconds())); // execute asynchronously to avoid blocking caller (which may be processing gossip) Runnable runnable = new Runnable() @@ -368,7 +365,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hostId = Gossiper.instance.getHostId(endpoint); logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint); final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId)); - DecoratedKey epkey = StorageService.getPartitioner().decorateKey(hostIdBytes); final AtomicInteger rowsReplayed = new AtomicInteger(0); @@ -380,7 +376,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group op = hintStore.readOrdering.start(); - RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, epkey).queryMemtableAndDisk(hintStore, op), nowInSec)) + RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, hostIdBytes).queryMemtableAndDisk(hintStore, op), nowInSec)) { List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList(); @@ -480,7 +476,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean ColumnFilter.all(hintStore.metadata), RowFilter.NONE, DataLimits.cqlLimits(Integer.MAX_VALUE, 1), - DataRange.allData(StorageService.getPartitioner())); + DataRange.allData(hintStore.metadata.partitioner)); try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) { @@ -546,12 +542,12 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean public List<String> listEndpointsPendingHints() { - Token.TokenFactory tokenFactory = StorageService.getPartitioner().getTokenFactory(); - // Extract the keys as strings to be reported. - LinkedList<String> result = new LinkedList<>(); + List<String> result = new ArrayList<>(); + ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds()); - try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) + try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); + UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup)) { while (iter.hasNext()) { @@ -560,10 +556,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean // We don't delete by range on the hints table, so we don't have to worry about the // iterator returning only range tombstone marker if (partition.hasNext()) - result.addFirst(tokenFactory.toString(partition.partitionKey().getToken())); + result.add(UUIDType.instance.compose(partition.partitionKey().getKey()).toString()); } } } + return result; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index ecaf063..5ec9fe5 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -436,7 +436,6 @@ public class Memtable implements Comparable<Memtable> (long)partitions.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, - cfs.partitioner, sstableMetadataCollector, new SerializationHeader(cfs.metadata, columns, stats), txn)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index ace114b..d6b0a43 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import org.apache.commons.lang3.StringUtils; @@ -31,7 +32,6 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,6 +107,7 @@ public class Mutation implements IMutation public Mutation add(PartitionUpdate update) { assert update != null; + assert update.partitionKey().getPartitioner() == key.getPartitioner(); PartitionUpdate prev = modifications.put(update.metadata().cfId, update); if (prev != null) // developer error @@ -270,15 +271,14 @@ public class Mutation implements IMutation public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException { - String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that if (version < MessagingService.VERSION_20) - keyspaceName = in.readUTF(); + in.readUTF(); // read pre-2.0 keyspace name - DecoratedKey key = null; + ByteBuffer key = null; int size; if (version < MessagingService.VERSION_30) { - key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + key = ByteBufferUtil.readWithShortLength(in); size = in.readInt(); } else @@ -288,23 +288,19 @@ public class Mutation implements IMutation assert size > 0; + PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, flag, key); if (size == 1) - return new Mutation(PartitionUpdate.serializer.deserialize(in, version, flag, key)); + return new Mutation(update); Map<UUID, PartitionUpdate> modifications = new HashMap<>(size); - PartitionUpdate update = null; - for (int i = 0; i < size; ++i) + DecoratedKey dk = update.partitionKey(); + for (int i = 1; i < size; ++i) { - update = PartitionUpdate.serializer.deserialize(in, version, flag, key); + update = PartitionUpdate.serializer.deserialize(in, version, flag, dk); modifications.put(update.metadata().cfId, update); } - if (keyspaceName == null) - keyspaceName = update.metadata().ksName; - if (key == null) - key = update.partitionKey(); - - return new Mutation(keyspaceName, key, modifications); + return new Mutation(update.metadata().ksName, dk, modifications); } public Mutation deserialize(DataInputPlus in, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/PartitionPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java index afb446d..ac5258d 100644 --- a/src/java/org/apache/cassandra/db/PartitionPosition.java +++ b/src/java/org/apache/cassandra/db/PartitionPosition.java @@ -84,7 +84,7 @@ public interface PartitionPosition extends RingPosition<PartitionPosition> if (kind == Kind.ROW_KEY) { ByteBuffer k = ByteBufferUtil.readWithShortLength(in); - return StorageService.getPartitioner().decorateKey(k); + return p.decorateKey(k); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index d48fca5..18b6950 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -90,7 +90,7 @@ public class PartitionRangeReadCommand extends ReadCommand ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, - DataRange.allData(StorageService.getPartitioner())); + DataRange.allData(metadata.partitioner)); } public DataRange dataRange() http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java index c06a7f7..e4f05b0 100644 --- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java @@ -223,7 +223,7 @@ public class RowUpdateBuilder return (DecoratedKey)partitionKey[0]; ByteBuffer key = CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey)); - return StorageService.getPartitioner().decorateKey(key); + return metadata.decorateKey(key); } private static PartitionUpdate getOrAdd(CFMetaData metadata, Mutation mutation) http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java index 5ffbd55..b0958fc 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db; +import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Sets; @@ -67,6 +68,17 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); } + public SinglePartitionNamesCommand(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + ByteBuffer key, + ClusteringIndexNamesFilter clusteringIndexFilter) + { + this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter); + } + public SinglePartitionNamesCommand copy() { return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 3d4e42e..6e9e2d5 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.cache.*; @@ -57,6 +58,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter F clusteringIndexFilter) { super(Kind.SINGLE_PARTITION, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + assert partitionKey.getPartitioner() == metadata.partitioner; this.partitionKey = partitionKey; this.clusteringIndexFilter = clusteringIndexFilter; } @@ -145,6 +147,20 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter return SinglePartitionSliceCommand.create(metadata, nowInSec, key, Slices.ALL); } + /** + * Creates a new read command that queries a single partition in its entirety. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * + * @return a newly created read command that queries all the rows of {@code key}. + */ + public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key) + { + return SinglePartitionSliceCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); + } + public DecoratedKey partitionKey() { return partitionKey; @@ -486,7 +502,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException { - DecoratedKey key = StorageService.getPartitioner().decorateKey(metadata.getKeyValidator().readValue(in)); + DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in)); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); if (filter instanceof ClusteringIndexNamesFilter) return new SinglePartitionNamesCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java index b4cbbd6..bb9a35e 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db; +import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterables; @@ -97,6 +98,22 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus return new SinglePartitionSliceCommand(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); } + /** + * Creates a new single partition slice command for the provided slices. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param slices the slices of rows to query. + * + * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will + * query every columns for the table (without limit or row filtering) and be in forward order. + */ + public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices) + { + return create(metadata, nowInSec, metadata.decorateKey(key), slices); + } + public SinglePartitionSliceCommand copy() { return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index e31feaa..d17eaf7 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -451,11 +451,6 @@ public final class SystemKeyspace DECOMMISSIONED } - private static DecoratedKey decorate(ByteBuffer key) - { - return StorageService.getPartitioner().decorateKey(key); - } - public static void finishStartup() { persistLocalMetadata(); @@ -564,7 +559,7 @@ public final class SystemKeyspace public static void updateMaterializedViewBuildStatus(String ksname, String viewName, Token token) { String req = "INSERT INTO system.%s (keyspace_name, view_name, last_token) VALUES (?, ?, ?)"; - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory(); executeInternal(String.format(req, MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS), ksname, viewName, factory.toString(token)); } @@ -583,7 +578,7 @@ public final class SystemKeyspace generation = row.getInt("generation_number"); if (row.has("last_key")) { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + Token.TokenFactory factory = MaterializedViewsBuildsInProgress.partitioner.getTokenFactory(); lastKey = factory.fromString(row.getString("last_key")); } @@ -717,7 +712,9 @@ public final class SystemKeyspace private static Set<String> tokensAsSet(Collection<Token> tokens) { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + if (tokens.isEmpty()) + return Collections.emptySet(); + Token.TokenFactory factory = StorageService.instance.getTokenFactory(); Set<String> s = new HashSet<>(tokens.size()); for (Token tk : tokens) s.add(factory.toString(tk)); @@ -726,7 +723,7 @@ public final class SystemKeyspace private static Collection<Token> deserializeTokens(Collection<String> tokensStrings) { - Token.TokenFactory factory = StorageService.getPartitioner().getTokenFactory(); + Token.TokenFactory factory = StorageService.instance.getTokenFactory(); List<Token> tokens = new ArrayList<>(tokensStrings.size()); for (String tk : tokensStrings) tokens.add(factory.fromString(tk)); @@ -1165,8 +1162,7 @@ public final class SystemKeyspace public static void updateSizeEstimates(String keyspace, String table, Map<Range<Token>, Pair<Long, Long>> estimates) { long timestamp = FBUtilities.timestampMicros(); - DecoratedKey key = decorate(UTF8Type.instance.decompose(keyspace)); - PartitionUpdate update = new PartitionUpdate(SizeEstimates, key, SizeEstimates.partitionColumns(), estimates.size()); + PartitionUpdate update = new PartitionUpdate(SizeEstimates, UTF8Type.instance.decompose(keyspace), SizeEstimates.partitionColumns(), estimates.size()); Mutation mutation = new Mutation(update); // delete all previous values with a single range tombstone. http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 3dd6f38..548c661 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -961,7 +961,6 @@ public class CompactionManager implements CompactionManagerMBean expectedBloomFilterSize, repairedAt, sstable.getSSTableLevel(), - cfs.partitioner, sstable.header, txn); } @@ -993,7 +992,6 @@ public class CompactionManager implements CompactionManagerMBean (long) expectedBloomFilterSize, repairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(sstables, cfs.metadata.comparator, minLevel), SerializationHeader.make(cfs.metadata, sstables), txn); @@ -1085,7 +1083,7 @@ public class CompactionManager implements CompactionManagerMBean } // determine tree depth from number of partitions, but cap at 20 to prevent large tree. int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0; - MerkleTree tree = new MerkleTree(cfs.partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); + MerkleTree tree = new MerkleTree(cfs.getPartitioner(), validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth)); long start = System.nanoTime(); try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 0cee370..7fd5717 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -80,7 +80,7 @@ public class LeveledManifest for (int i = 0; i < generations.length; i++) { generations[i] = new ArrayList<>(); - lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound(); + lastCompactedKeys[i] = cfs.getPartitioner().getMinimumToken().minKeyBound(); } compactionCounter = new int[MAX_LEVEL_COUNT]; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 81e307a..5b3f6c7 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -179,7 +179,7 @@ public class Scrubber implements Closeable DecoratedKey key = null; try { - key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); + key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); } catch (Throwable th) { @@ -249,7 +249,7 @@ public class Scrubber implements Closeable { outputHandler.output(String.format("Retrying from row index; data is %s bytes starting at %s", dataSizeFromIndex, dataStartFromIndex)); - key = sstable.partitioner.decorateKey(currentIndexKey); + key = sstable.decorateKey(currentIndexKey); try { dataFile.seek(dataStartFromIndex); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index be0dd2a..b8a102e 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -83,7 +83,6 @@ public class Upgrader estimatedRows, repairedAt, cfs.metadata, - cfs.partitioner, sstableMetadataCollector, SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)), transaction); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index 90a97a0..ae4e966 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -146,7 +146,7 @@ public class Verifier implements Closeable DecoratedKey key = null; try { - key = sstable.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); + key = sstable.decorateKey(ByteBufferUtil.readWithShortLength(dataFile)); } catch (Throwable th) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java index cdacddc..53dad55 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java @@ -52,7 +52,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter estimatedTotalKeys, minRepairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index ad58967..a44ea7e 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -67,7 +67,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter keysPerSSTable, minRepairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); @@ -96,7 +95,6 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter averageEstimatedKeysPerSSTable, minRepairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java index 9902357..3942b1e 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java @@ -56,7 +56,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, level), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); @@ -75,7 +74,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter estimatedTotalKeys / estimatedSSTables, minRepairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, level), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java index 14cb795..5d8670d 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java @@ -89,7 +89,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter currentPartitionsToWrite, minRepairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); @@ -113,7 +112,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter currentPartitionsToWrite, minRepairedAt, cfs.metadata, - cfs.partitioner, new MetadataCollector(allSSTables, cfs.metadata.comparator, 0), SerializationHeader.make(cfs.metadata, nonExpiredSSTables), txn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/69f77cbd/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java index 842cbb9..4bb0bc4 100644 --- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java @@ -26,7 +26,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -51,10 +50,9 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec columnDef = columnDefs.iterator().next(); - CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef); + CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef, getIndexKeyComparator()); indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace, indexedCfMetadata.cfName, - new LocalPartitioner(getIndexKeyComparator()), indexedCfMetadata, baseCfs.getTracker().loadsstables); }