Merge branch 'cassandra-1.2' into cassandra-2.0 Conflicts: src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6fda9c6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6fda9c6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6fda9c6 Branch: refs/heads/cassandra-2.0 Commit: f6fda9c69c3aa38fbd66d9c005cdc08f81e32962 Parents: 678aa37 fd12966 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Wed Sep 11 00:57:01 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Sep 11 00:57:01 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Keyspace.java | 2 +- .../db/index/PerRowSecondaryIndex.java | 11 +--- .../db/index/SecondaryIndexManager.java | 64 ++++++++++---------- .../db/SecondaryIndexColumnSizeTest.java | 4 +- .../db/index/PerRowSecondaryIndexTest.java | 21 +++---- 6 files changed, 46 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index f9a3b80,12e2017..808c558 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -26,54 -10,14 +26,55 @@@ Merged from 1.2 * Fix loading index summary containing empty key (CASSANDRA-5965) * Correctly handle limits in CompositesSearcher (CASSANDRA-5975) * Pig: handle CQL collections (CASSANDRA-5867) + * Pass the updated cf to the PRSI index() method (CASSANDRA-5999) -1.2.9 +2.0.0 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138) + * Fix periodic memtable flushing behavior with clean memtables (CASSANDRA-5931) + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928) + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938) + * Add stream session progress to JMX (CASSANDRA-4757) + * Fix NPE during CAS operation (CASSANDRA-5925) +Merged from 1.2: * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900) - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases - (CASSANDRA-5800) - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831) + * Don't announce schema version until we've loaded the changes locally + (CASSANDRA-5904) + * Fix to support off heap bloom filters size greater than 2 GB (CASSANDRA-5903) + * Properly handle parsing huge map and set literals (CASSANDRA-5893) + + +2.0.0-rc2 + * enable vnodes by default (CASSANDRA-5869) + * fix CAS contention timeout (CASSANDRA-5830) + * fix HsHa to respect max frame size (CASSANDRA-4573) + * Fix (some) 2i on composite components omissions (CASSANDRA-5851) + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880) +Merged from 1.2: + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855) + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868) + * cqlsh: add support for multiline comments (CASSANDRA-5798) + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns + (CASSANDRA-5856) + + +2.0.0-rc1 + * improve DecimalSerializer performance (CASSANDRA-5837) + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690) + * fix schema-related trigger issues (CASSANDRA-5774) + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138) + * Fix assertion error during repair (CASSANDRA-5801) + * Fix range tombstone bug (CASSANDRA-5805) + * DC-local CAS (CASSANDRA-5797) + * Add a native_protocol_version column to the system.local table (CASSANRDA-5819) + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822) + * Fix buffer underflow on socket close (CASSANDRA-5792) +Merged from 1.2: + * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814) + * cqlsh: add collections support to COPY (CASSANDRA-5698) + * retry important messages for any IOException (CASSANDRA-5804) + * Allow empty IN relations in SELECT/UPDATE/DELETE statements (CASSANDRA-5626) + * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812) * fix bulk-loading compressed sstables (CASSANDRA-5820) * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter (CASSANDRA-5824) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index 667a656,0000000..4914c11 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -1,454 -1,0 +1,454 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.pager.QueryPagers; +import org.apache.cassandra.tracing.Tracing; + +/** + * It represents a Keyspace. + */ +public class Keyspace +{ + public static final String SYSTEM_KS = "system"; + private static final int DEFAULT_PAGE_SIZE = 10000; + + private static final Logger logger = LoggerFactory.getLogger(Keyspace.class); + + /** + * accesses to CFS.memtable should acquire this for thread safety. + * CFS.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation. + * <p/> + * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.) + */ + public static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock(); + + // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure + // proper directories here as well as in CassandraDaemon. + static + { + if (!StorageService.instance.isClientMode()) + DatabaseDescriptor.createAllDirectories(); + } + + public final KSMetaData metadata; + + /* ColumnFamilyStore per column family */ + private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>(); + private volatile AbstractReplicationStrategy replicationStrategy; + public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>() + { + public Keyspace apply(String keyspaceName) + { + return Keyspace.open(keyspaceName); + } + }; + + public static Keyspace open(String keyspaceName) + { + return open(keyspaceName, Schema.instance, true); + } + + public static Keyspace openWithoutSSTables(String keyspaceName) + { + return open(keyspaceName, Schema.instance, false); + } + + private static Keyspace open(String keyspaceName, Schema schema, boolean loadSSTables) + { + Keyspace keyspaceInstance = schema.getKeyspaceInstance(keyspaceName); + + if (keyspaceInstance == null) + { + // instantiate the Keyspace. we could use putIfAbsent but it's important to making sure it is only done once + // per keyspace, so we synchronize and re-check before doing it. + synchronized (Keyspace.class) + { + keyspaceInstance = schema.getKeyspaceInstance(keyspaceName); + if (keyspaceInstance == null) + { + // open and store the keyspace + keyspaceInstance = new Keyspace(keyspaceName, loadSSTables); + schema.storeKeyspaceInstance(keyspaceInstance); + + // keyspace has to be constructed and in the cache before cacheRow can be called + for (ColumnFamilyStore cfs : keyspaceInstance.getColumnFamilyStores()) + cfs.initRowCache(); + } + } + } + return keyspaceInstance; + } + + public static Keyspace clear(String keyspaceName) + { + return clear(keyspaceName, Schema.instance); + } + + public static Keyspace clear(String keyspaceName, Schema schema) + { + synchronized (Keyspace.class) + { + Keyspace t = schema.removeKeyspaceInstance(keyspaceName); + if (t != null) + { + for (ColumnFamilyStore cfs : t.getColumnFamilyStores()) + t.unloadCf(cfs); + } + return t; + } + } + + /** + * Removes every SSTable in the directory from the appropriate DataTracker's view. + * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. + */ + public static void removeUnreadableSSTables(File directory) + { + for (Keyspace keyspace : Keyspace.all()) + { + for (ColumnFamilyStore baseCfs : keyspace.getColumnFamilyStores()) + { + for (ColumnFamilyStore cfs : baseCfs.concatWithIndexes()) + cfs.maybeRemoveUnreadableSSTables(directory); + } + } + } + + public Collection<ColumnFamilyStore> getColumnFamilyStores() + { + return Collections.unmodifiableCollection(columnFamilyStores.values()); + } + + public ColumnFamilyStore getColumnFamilyStore(String cfName) + { + UUID id = Schema.instance.getId(getName(), cfName); + if (id == null) + throw new IllegalArgumentException(String.format("Unknown keyspace/cf pair (%s.%s)", getName(), cfName)); + return getColumnFamilyStore(id); + } + + public ColumnFamilyStore getColumnFamilyStore(UUID id) + { + ColumnFamilyStore cfs = columnFamilyStores.get(id); + if (cfs == null) + throw new IllegalArgumentException("Unknown CF " + id); + return cfs; + } + + /** + * Take a snapshot of the specific column family, or the entire set of column families + * if columnFamily is null with a given timestamp + * + * @param snapshotName the tag associated with the name of the snapshot. This value may not be null + * @param columnFamilyName the column family to snapshot or all on null + * @throws IOException if the column family doesn't exist + */ + public void snapshot(String snapshotName, String columnFamilyName) throws IOException + { + assert snapshotName != null; + boolean tookSnapShot = false; + for (ColumnFamilyStore cfStore : columnFamilyStores.values()) + { + if (columnFamilyName == null || cfStore.name.equals(columnFamilyName)) + { + tookSnapShot = true; + cfStore.snapshot(snapshotName); + } + } + + if ((columnFamilyName != null) && !tookSnapShot) + throw new IOException("Failed taking snapshot. Column family " + columnFamilyName + " does not exist."); + } + + /** + * @param clientSuppliedName may be null. + * @return the name of the snapshot + */ + public static String getTimestampedSnapshotName(String clientSuppliedName) + { + String snapshotName = Long.toString(System.currentTimeMillis()); + if (clientSuppliedName != null && !clientSuppliedName.equals("")) + { + snapshotName = snapshotName + "-" + clientSuppliedName; + } + return snapshotName; + } + + /** + * Check whether snapshots already exists for a given name. + * + * @param snapshotName the user supplied snapshot name + * @return true if the snapshot exists + */ + public boolean snapshotExists(String snapshotName) + { + assert snapshotName != null; + for (ColumnFamilyStore cfStore : columnFamilyStores.values()) + { + if (cfStore.snapshotExists(snapshotName)) + return true; + } + return false; + } + + /** + * Clear all the snapshots for a given keyspace. + * + * @param snapshotName the user supplied snapshot name. It empty or null, + * all the snapshots will be cleaned + */ + public void clearSnapshot(String snapshotName) + { + for (ColumnFamilyStore cfStore : columnFamilyStores.values()) + { + cfStore.clearSnapshot(snapshotName); + } + } + + /** + * @return A list of open SSTableReaders + */ + public List<SSTableReader> getAllSSTables() + { + List<SSTableReader> list = new ArrayList<SSTableReader>(columnFamilyStores.size()); + for (ColumnFamilyStore cfStore : columnFamilyStores.values()) + list.addAll(cfStore.getSSTables()); + return list; + } + + private Keyspace(String keyspaceName, boolean loadSSTables) + { + metadata = Schema.instance.getKSMetaData(keyspaceName); + assert metadata != null : "Unknown keyspace " + keyspaceName; + createReplicationStrategy(metadata); + + for (CFMetaData cfm : new ArrayList<CFMetaData>(metadata.cfMetaData().values())) + { + logger.debug("Initializing {}.{}", getName(), cfm.cfName); + initCf(cfm.cfId, cfm.cfName, loadSSTables); + } + } + + public void createReplicationStrategy(KSMetaData ksm) + { + if (replicationStrategy != null) + StorageService.instance.getTokenMetadata().unregister(replicationStrategy); + + replicationStrategy = AbstractReplicationStrategy.createReplicationStrategy(ksm.name, + ksm.strategyClass, + StorageService.instance.getTokenMetadata(), + DatabaseDescriptor.getEndpointSnitch(), + ksm.strategyOptions); + } + + // best invoked on the compaction mananger. + public void dropCf(UUID cfId) + { + assert columnFamilyStores.containsKey(cfId); + ColumnFamilyStore cfs = columnFamilyStores.remove(cfId); + if (cfs == null) + return; + + unloadCf(cfs); + } + + // disassociate a cfs from this keyspace instance. + private void unloadCf(ColumnFamilyStore cfs) + { + cfs.forceBlockingFlush(); + cfs.invalidate(); + } + + /** + * adds a cf to internal structures, ends up creating disk files). + */ + public void initCf(UUID cfId, String cfName, boolean loadSSTables) + { + ColumnFamilyStore cfs = columnFamilyStores.get(cfId); + + if (cfs == null) + { + // CFS being created for the first time, either on server startup or new CF being added. + // We don't worry about races here; startup is safe, and adding multiple idential CFs + // simultaneously is a "don't do that" scenario. + ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables)); + // CFS mbean instantiation will error out before we hit this, but in case that changes... + if (oldCfs != null) + throw new IllegalStateException("added multiple mappings for cf id " + cfId); + } + else + { + // re-initializing an existing CF. This will happen if you cleared the schema + // on this node and it's getting repopulated from the rest of the cluster. + assert cfs.name.equals(cfName); + cfs.metadata.reload(); + cfs.reload(); + } + } + + public Row getRow(QueryFilter filter) + { + ColumnFamilyStore cfStore = getColumnFamilyStore(filter.getColumnFamilyName()); + ColumnFamily columnFamily = cfStore.getColumnFamily(filter); + return new Row(filter.key, columnFamily); + } + + public void apply(RowMutation mutation, boolean writeCommitLog) + { + apply(mutation, writeCommitLog, true); + } + + /** + * This method appends a row to the global CommitLog, then updates memtables and indexes. + * + * @param mutation the row to write. Must not be modified after calling apply, since commitlog append + * may happen concurrently, depending on the CL Executor type. + * @param writeCommitLog false to disable commitlog append entirely + * @param updateIndexes false to disable index updates (used by CollationController "defragmenting") + */ + public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes) + { + // write the mutation to the commitlog and memtables + Tracing.trace("Acquiring switchLock read lock"); + switchLock.readLock().lock(); + try + { + if (writeCommitLog) + { + Tracing.trace("Appending to commitlog"); + CommitLog.instance.add(mutation); + } + + DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key()); + for (ColumnFamily cf : mutation.getColumnFamilies()) + { + ColumnFamilyStore cfs = columnFamilyStores.get(cf.id()); + if (cfs == null) + { + logger.error("Attempting to mutate non-existant column family " + cf.id()); + continue; + } + + Tracing.trace("Adding to {} memtable", cf.metadata().cfName); - cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater); ++ cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, cf) : SecondaryIndexManager.nullUpdater); + } + } + finally + { + switchLock.readLock().unlock(); + } + } + + public AbstractReplicationStrategy getReplicationStrategy() + { + return replicationStrategy; + } + + /** + * @param key row to index + * @param cfs ColumnFamily to index row in + * @param idxNames columns to index, in comparator order + */ + public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames) + { + if (logger.isDebugEnabled()) + logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key)); + + Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames); + + switchLock.readLock().lock(); + try + { + Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE); + while (pager.hasNext()) + { + ColumnFamily cf = pager.next(); + ColumnFamily cf2 = cf.cloneMeShallow(); + for (Column column : cf) + { + if (cfs.indexManager.indexes(column.name(), indexes)) + cf2.addColumn(column); + } + cfs.indexManager.indexRow(key.key, cf2); + } + } + finally + { + switchLock.readLock().unlock(); + } + } + + public List<Future<?>> flush() + { + List<Future<?>> futures = new ArrayList<Future<?>>(columnFamilyStores.size()); + for (UUID cfId : columnFamilyStores.keySet()) + futures.add(columnFamilyStores.get(cfId).forceFlush()); + return futures; + } + + public static Iterable<Keyspace> all() + { + return Iterables.transform(Schema.instance.getKeyspaces(), keyspaceTransformer); + } + + public static Iterable<Keyspace> nonSystem() + { + return Iterables.transform(Schema.instance.getNonSystemKeyspaces(), keyspaceTransformer); + } + + public static Iterable<Keyspace> system() + { + return Iterables.transform(Schema.systemKeyspaceNames, keyspaceTransformer); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "(name='" + getName() + "')"; + } + + public String getName() + { + return metadata.name; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 67ab3e3,52d2152..eff9537 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@@ -77,8 -78,8 +77,8 @@@ public class SecondaryIndexManage public SecondaryIndexManager(ColumnFamilyStore baseCfs) { -- indexesByColumn = new ConcurrentSkipListMap<ByteBuffer, SecondaryIndex>(); -- rowLevelIndexMap = new HashMap<Class<? extends SecondaryIndex>, SecondaryIndex>(); ++ indexesByColumn = new ConcurrentSkipListMap<>(); ++ rowLevelIndexMap = new HashMap<>(); this.baseCfs = baseCfs; } @@@ -106,15 -106,15 +106,13 @@@ Set<SecondaryIndex> reloadedIndexes = Collections.newSetFromMap(new IdentityHashMap<SecondaryIndex, Boolean>()); for (SecondaryIndex index : indexesByColumn.values()) -- { if (reloadedIndexes.add(index)) index.reload(); -- } } public Set<String> allIndexesNames() { - Set<String> names = new HashSet<String>(indexesByColumn.size()); - Set<String> names = new HashSet<String>(); ++ Set<String> names = new HashSet<>(indexesByColumn.size()); for (SecondaryIndex index : indexesByColumn.values()) names.add(index.getIndexName()); return names; @@@ -157,16 -167,12 +155,16 @@@ for (SecondaryIndex index : indexes) { if (index.indexes(name)) - return index; + { + if (matching == null) - matching = new ArrayList<SecondaryIndex>(); ++ matching = new ArrayList<>(); + matching.add(index); + } } - return null; + return matching == null ? Collections.<SecondaryIndex>emptyList() : matching; } - public boolean indexes(IColumn column) + public boolean indexes(Column column) { return indexes(column.name()); } @@@ -198,6 -204,6 +196,7 @@@ for (SecondaryIndexSearcher searcher : searchers) if (!searcher.isIndexing(clause)) return false; ++ return true; } @@@ -323,16 -334,16 +322,14 @@@ */ public List<String> getBuiltIndexes() { -- List<String> indexList = new ArrayList<String>(); ++ List<String> indexList = new ArrayList<>(); for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet()) { SecondaryIndex index = entry.getValue(); if (index.isIndexBuilt(entry.getKey())) -- { indexList.add(entry.getValue().getIndexName()); -- } } return indexList; @@@ -343,7 -364,7 +340,7 @@@ */ public Collection<ColumnFamilyStore> getIndexesBackedByCfs() { -- ArrayList<ColumnFamilyStore> cfsList = new ArrayList<ColumnFamilyStore>(); ++ ArrayList<ColumnFamilyStore> cfsList = new ArrayList<>(); for (SecondaryIndex index: indexesByColumn.values()) { @@@ -406,18 -427,18 +403,16 @@@ if (index instanceof PerRowSecondaryIndex) { if (appliedRowLevelIndexes == null) -- appliedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>(); ++ appliedRowLevelIndexes = new HashSet<>(); if (appliedRowLevelIndexes.add(index.getClass())) ((PerRowSecondaryIndex)index).index(key, cf); } else { - for (IColumn column : cf) - { + for (Column column : cf) - { if (index.indexes(column.name())) ((PerColumnSecondaryIndex) index).insert(key, column); -- } } } } @@@ -442,7 -463,7 +437,7 @@@ if (index instanceof PerRowSecondaryIndex) { if (cleanedRowLevelIndexes == null) -- cleanedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>(); ++ cleanedRowLevelIndexes = new HashSet<>(); if (cleanedRowLevelIndexes.add(index.getClass())) ((PerRowSecondaryIndex)index).delete(key); @@@ -474,7 -504,7 +478,7 @@@ */ private List<SecondaryIndexSearcher> getIndexSearchersForQuery(List<IndexExpression> clause) { -- Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<String, Set<ByteBuffer>>(); ++ Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<>(); //Group columns by type for (IndexExpression ix : clause) @@@ -488,14 -518,14 +492,14 @@@ if (columns == null) { -- columns = new HashSet<ByteBuffer>(); ++ columns = new HashSet<>(); groupByIndexType.put(index.getClass().getCanonicalName(), columns); } columns.add(ix.column_name); } -- List<SecondaryIndexSearcher> indexSearchers = new ArrayList<SecondaryIndexSearcher>(groupByIndexType.size()); ++ List<SecondaryIndexSearcher> indexSearchers = new ArrayList<>(groupByIndexType.size()); //create searcher per type for (Set<ByteBuffer> column : groupByIndexType.values()) @@@ -527,12 -560,12 +531,10 @@@ public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames) { -- List<SecondaryIndex> result = new ArrayList<SecondaryIndex>(); ++ List<SecondaryIndex> result = new ArrayList<>(); for (SecondaryIndex index : indexesByColumn.values()) -- { if (idxNames.contains(index.getIndexName())) result.add(index); -- } return result; } @@@ -550,8 -583,8 +552,8 @@@ public boolean validate(Column column) { - SecondaryIndex index = getIndexForColumn(column.name); + SecondaryIndex index = getIndexForColumn(column.name()); - return index != null ? index.validate(column) : true; + return index == null || index.validate(column); } public static interface Updater @@@ -572,25 -605,28 +574,25 @@@ private class StandardUpdater implements Updater { private final DecoratedKey key; + private final ColumnFamily cf; - public StandardUpdater(DecoratedKey key) + public StandardUpdater(DecoratedKey key, ColumnFamily cf) { this.key = key; + this.cf = cf; } - public void insert(IColumn column) + public void insert(Column column) { - if (column.isMarkedForDelete()) + if (column.isMarkedForDelete(System.currentTimeMillis())) return; - SecondaryIndex index = indexFor(column.name()); - if (index == null) - return; - - if (index instanceof PerColumnSecondaryIndex) - ((PerColumnSecondaryIndex) index).insert(key.key, column); + for (SecondaryIndex index : indexFor(column.name())) - { + if (index instanceof PerColumnSecondaryIndex) + ((PerColumnSecondaryIndex) index).insert(key.key, column); - } } - public void update(IColumn oldColumn, IColumn column) + public void update(Column oldColumn, Column column) { if (oldColumn.equals(column)) return; @@@ -608,16 -645,17 +610,14 @@@ } } - public void remove(IColumn column) + public void remove(Column column) { - if (column.isMarkedForDelete()) + if (column.isMarkedForDelete(System.currentTimeMillis())) return; - SecondaryIndex index = indexFor(column.name()); - if (index == null) - return; - - if (index instanceof PerColumnSecondaryIndex) - ((PerColumnSecondaryIndex) index).delete(key.key, column); + for (SecondaryIndex index : indexFor(column.name())) - { + if (index instanceof PerColumnSecondaryIndex) + ((PerColumnSecondaryIndex) index).delete(key.key, column); - } } public void updateRowLevelIndexes() http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java index 3a10b80,3281a5a..51d9ee8 --- a/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java +++ b/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java @@@ -47,10 -51,10 +47,10 @@@ public class SecondaryIndexColumnSizeTe // for read buffer.flip(); - column.value = buffer; + Column column = new Column(ByteBufferUtil.bytes("test"), buffer, 0); - MockRowIndex mockRowIndex = new MockRowIndex(); - MockColumnIndex mockColumnIndex = new MockColumnIndex(); + SecondaryIndexColumnSizeTest.MockRowIndex mockRowIndex = new SecondaryIndexColumnSizeTest.MockRowIndex(); + SecondaryIndexColumnSizeTest.MockColumnIndex mockColumnIndex = new SecondaryIndexColumnSizeTest.MockColumnIndex(); assertTrue(mockRowIndex.validate(column)); assertFalse(mockColumnIndex.validate(column)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java index 3e9d3a1,c720d21..b983e6e --- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java @@@ -58,22 -59,22 +58,22 @@@ public class PerRowSecondaryIndexTest e // create a row then test that the configured index instance was able to read the row RowMutation rm; rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k1")); - rm.add(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("foo"), 1); + rm.add("Indexed1", ByteBufferUtil.bytes("indexed"), ByteBufferUtil.bytes("foo"), 1); rm.apply(); - ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW; + ColumnFamily indexedRow = PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW; assertNotNull(indexedRow); assertEquals(ByteBufferUtil.bytes("foo"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value()); // update the row and verify what was indexed rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k1")); - rm.add(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("bar"), 2); + rm.add("Indexed1", ByteBufferUtil.bytes("indexed"), ByteBufferUtil.bytes("bar"), 2); rm.apply(); - indexedRow = TestIndex.LAST_INDEXED_ROW; + indexedRow = PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW; assertNotNull(indexedRow); assertEquals(ByteBufferUtil.bytes("bar"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value()); - assertTrue(Arrays.equals("k1".getBytes(), TestIndex.LAST_INDEXED_KEY.array())); + assertTrue(Arrays.equals("k1".getBytes(), PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_KEY.array())); } @Test @@@ -82,17 -83,17 +82,17 @@@ // issue a column delete and test that the configured index instance was notified to update RowMutation rm; rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k2")); - rm.delete(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), 1); + rm.delete("Indexed1", ByteBufferUtil.bytes("indexed"), 1); rm.apply(); - ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW; + ColumnFamily indexedRow = PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW; assertNotNull(indexedRow); - for (IColumn column : indexedRow.getSortedColumns()) + for (Column column : indexedRow.getSortedColumns()) { - assertTrue(column.isMarkedForDelete()); + assertTrue(column.isMarkedForDelete(System.currentTimeMillis())); } - assertTrue(Arrays.equals("k2".getBytes(), TestIndex.LAST_INDEXED_KEY.array())); + assertTrue(Arrays.equals("k2".getBytes(), PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_KEY.array())); } @Test @@@ -101,16 -102,16 +101,16 @@@ // issue a row level delete and test that the configured index instance was notified to update RowMutation rm; rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k3")); - rm.delete(new QueryPath("Indexed1"), 1); + rm.delete("Indexed1", 1); rm.apply(); - ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW; + ColumnFamily indexedRow = PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW; assertNotNull(indexedRow); - for (IColumn column : indexedRow.getSortedColumns()) + for (Column column : indexedRow.getSortedColumns()) { - assertTrue(column.isMarkedForDelete()); + assertTrue(column.isMarkedForDelete(System.currentTimeMillis())); } - assertTrue(Arrays.equals("k3".getBytes(), TestIndex.LAST_INDEXED_KEY.array())); + assertTrue(Arrays.equals("k3".getBytes(), PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_KEY.array())); } public static class TestIndex extends PerRowSecondaryIndex @@@ -127,14 -128,8 +127,9 @@@ @Override public void index(ByteBuffer rowKey, ColumnFamily cf) { - } - - @Override - public void index(ByteBuffer rowKey) - { QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey), - new QueryPath(baseCfs.getColumnFamilyName())); + baseCfs.getColumnFamilyName(), + System.currentTimeMillis()); LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter); LAST_INDEXED_KEY = rowKey; }