Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
        src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
        src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6fda9c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6fda9c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6fda9c6

Branch: refs/heads/cassandra-2.0
Commit: f6fda9c69c3aa38fbd66d9c005cdc08f81e32962
Parents: 678aa37 fd12966
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Wed Sep 11 00:57:01 2013 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Wed Sep 11 00:57:01 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  |  2 +-
 .../db/index/PerRowSecondaryIndex.java          | 11 +---
 .../db/index/SecondaryIndexManager.java         | 64 ++++++++++----------
 .../db/SecondaryIndexColumnSizeTest.java        |  4 +-
 .../db/index/PerRowSecondaryIndexTest.java      | 21 +++----
 6 files changed, 46 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f9a3b80,12e2017..808c558
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -26,54 -10,14 +26,55 @@@ Merged from 1.2
   * Fix loading index summary containing empty key (CASSANDRA-5965)
   * Correctly handle limits in CompositesSearcher (CASSANDRA-5975)
   * Pig: handle CQL collections (CASSANDRA-5867)
+  * Pass the updated cf to the PRSI index() method (CASSANDRA-5999)
  
  
 -1.2.9
 +2.0.0
 + * Fix thrift validation when inserting into CQL3 tables (CASSANDRA-5138)
 + * Fix periodic memtable flushing behavior with clean memtables 
(CASSANDRA-5931)
 + * Fix dateOf() function for pre-2.0 timestamp columns (CASSANDRA-5928)
 + * Fix SSTable unintentionally loads BF when opened for batch (CASSANDRA-5938)
 + * Add stream session progress to JMX (CASSANDRA-4757)
 + * Fix NPE during CAS operation (CASSANDRA-5925)
 +Merged from 1.2:
   * Fix getBloomFilterDiskSpaceUsed for AlwaysPresentFilter (CASSANDRA-5900)
 - * migrate 1.1 schema_columnfamilies.key_alias column to key_aliases
 -   (CASSANDRA-5800)
 - * add --migrate option to sstableupgrade and sstablescrub (CASSANDRA-5831)
 + * Don't announce schema version until we've loaded the changes locally
 +   (CASSANDRA-5904)
 + * Fix to support off heap bloom filters size greater than 2 GB 
(CASSANDRA-5903)
 + * Properly handle parsing huge map and set literals (CASSANDRA-5893)
 +
 +
 +2.0.0-rc2
 + * enable vnodes by default (CASSANDRA-5869)
 + * fix CAS contention timeout (CASSANDRA-5830)
 + * fix HsHa to respect max frame size (CASSANDRA-4573)
 + * Fix (some) 2i on composite components omissions (CASSANDRA-5851)
 + * cqlsh: add DESCRIBE FULL SCHEMA variant (CASSANDRA-5880)
 +Merged from 1.2:
 + * Correctly validate sparse composite cells in scrub (CASSANDRA-5855)
 + * Add KeyCacheHitRate metric to CF metrics (CASSANDRA-5868)
 + * cqlsh: add support for multiline comments (CASSANDRA-5798)
 + * Handle CQL3 SELECT duplicate IN restrictions on clustering columns
 +   (CASSANDRA-5856)
 +
 +
 +2.0.0-rc1
 + * improve DecimalSerializer performance (CASSANDRA-5837)
 + * fix potential spurious wakeup in AsyncOneResponse (CASSANDRA-5690)
 + * fix schema-related trigger issues (CASSANDRA-5774)
 + * Better validation when accessing CQL3 table from thrift (CASSANDRA-5138)
 + * Fix assertion error during repair (CASSANDRA-5801)
 + * Fix range tombstone bug (CASSANDRA-5805)
 + * DC-local CAS (CASSANDRA-5797)
 + * Add a native_protocol_version column to the system.local table 
(CASSANRDA-5819)
 + * Use index_interval from cassandra.yaml when upgraded (CASSANDRA-5822)
 + * Fix buffer underflow on socket close (CASSANDRA-5792)
 +Merged from 1.2:
 + * Fix reading DeletionTime from 1.1-format sstables (CASSANDRA-5814)
 + * cqlsh: add collections support to COPY (CASSANDRA-5698)
 + * retry important messages for any IOException (CASSANDRA-5804)
 + * Allow empty IN relations in SELECT/UPDATE/DELETE statements 
(CASSANDRA-5626)
 + * cqlsh: fix crashing on Windows due to libedit detection (CASSANDRA-5812)
   * fix bulk-loading compressed sstables (CASSANDRA-5820)
   * (Hadoop) fix quoting in CqlPagingRecordReader and CqlRecordWriter 
     (CASSANDRA-5824)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index 667a656,0000000..4914c11
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -1,454 -1,0 +1,454 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
 +
 +import com.google.common.base.Function;
 +import com.google.common.collect.Iterables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.filter.QueryFilter;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.db.index.SecondaryIndexManager;
 +import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.locator.AbstractReplicationStrategy;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.pager.QueryPagers;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +/**
 + * It represents a Keyspace.
 + */
 +public class Keyspace
 +{
 +    public static final String SYSTEM_KS = "system";
 +    private static final int DEFAULT_PAGE_SIZE = 10000;
 +
 +    private static final Logger logger = 
LoggerFactory.getLogger(Keyspace.class);
 +
 +    /**
 +     * accesses to CFS.memtable should acquire this for thread safety.
 +     * CFS.maybeSwitchMemtable should aquire the writeLock; see that method 
for the full explanation.
 +     * <p/>
 +     * (Enabling fairness in the RRWL is observed to decrease throughput, so 
we leave it off.)
 +     */
 +    public static final ReentrantReadWriteLock switchLock = new 
ReentrantReadWriteLock();
 +
 +    // It is possible to call Keyspace.open without a running daemon, so it 
makes sense to ensure
 +    // proper directories here as well as in CassandraDaemon.
 +    static
 +    {
 +        if (!StorageService.instance.isClientMode())
 +            DatabaseDescriptor.createAllDirectories();
 +    }
 +
 +    public final KSMetaData metadata;
 +
 +    /* ColumnFamilyStore per column family */
 +    private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = 
new ConcurrentHashMap<UUID, ColumnFamilyStore>();
 +    private volatile AbstractReplicationStrategy replicationStrategy;
 +    public static final Function<String,Keyspace> keyspaceTransformer = new 
Function<String, Keyspace>()
 +    {
 +        public Keyspace apply(String keyspaceName)
 +        {
 +            return Keyspace.open(keyspaceName);
 +        }
 +    };
 +
 +    public static Keyspace open(String keyspaceName)
 +    {
 +        return open(keyspaceName, Schema.instance, true);
 +    }
 +
 +    public static Keyspace openWithoutSSTables(String keyspaceName)
 +    {
 +        return open(keyspaceName, Schema.instance, false);
 +    }
 +
 +    private static Keyspace open(String keyspaceName, Schema schema, boolean 
loadSSTables)
 +    {
 +        Keyspace keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
 +
 +        if (keyspaceInstance == null)
 +        {
 +            // instantiate the Keyspace.  we could use putIfAbsent but it's 
important to making sure it is only done once
 +            // per keyspace, so we synchronize and re-check before doing it.
 +            synchronized (Keyspace.class)
 +            {
 +                keyspaceInstance = schema.getKeyspaceInstance(keyspaceName);
 +                if (keyspaceInstance == null)
 +                {
 +                    // open and store the keyspace
 +                    keyspaceInstance = new Keyspace(keyspaceName, 
loadSSTables);
 +                    schema.storeKeyspaceInstance(keyspaceInstance);
 +
 +                    // keyspace has to be constructed and in the cache before 
cacheRow can be called
 +                    for (ColumnFamilyStore cfs : 
keyspaceInstance.getColumnFamilyStores())
 +                        cfs.initRowCache();
 +                }
 +            }
 +        }
 +        return keyspaceInstance;
 +    }
 +
 +    public static Keyspace clear(String keyspaceName)
 +    {
 +        return clear(keyspaceName, Schema.instance);
 +    }
 +
 +    public static Keyspace clear(String keyspaceName, Schema schema)
 +    {
 +        synchronized (Keyspace.class)
 +        {
 +            Keyspace t = schema.removeKeyspaceInstance(keyspaceName);
 +            if (t != null)
 +            {
 +                for (ColumnFamilyStore cfs : t.getColumnFamilyStores())
 +                    t.unloadCf(cfs);
 +            }
 +            return t;
 +        }
 +    }
 +
 +    /**
 +     * Removes every SSTable in the directory from the appropriate 
DataTracker's view.
 +     * @param directory the unreadable directory, possibly with SSTables in 
it, but not necessarily.
 +     */
 +    public static void removeUnreadableSSTables(File directory)
 +    {
 +        for (Keyspace keyspace : Keyspace.all())
 +        {
 +            for (ColumnFamilyStore baseCfs : keyspace.getColumnFamilyStores())
 +            {
 +                for (ColumnFamilyStore cfs : baseCfs.concatWithIndexes())
 +                    cfs.maybeRemoveUnreadableSSTables(directory);
 +            }
 +        }
 +    }
 +
 +    public Collection<ColumnFamilyStore> getColumnFamilyStores()
 +    {
 +        return 
Collections.unmodifiableCollection(columnFamilyStores.values());
 +    }
 +
 +    public ColumnFamilyStore getColumnFamilyStore(String cfName)
 +    {
 +        UUID id = Schema.instance.getId(getName(), cfName);
 +        if (id == null)
 +            throw new IllegalArgumentException(String.format("Unknown 
keyspace/cf pair (%s.%s)", getName(), cfName));
 +        return getColumnFamilyStore(id);
 +    }
 +
 +    public ColumnFamilyStore getColumnFamilyStore(UUID id)
 +    {
 +        ColumnFamilyStore cfs = columnFamilyStores.get(id);
 +        if (cfs == null)
 +            throw new IllegalArgumentException("Unknown CF " + id);
 +        return cfs;
 +    }
 +
 +    /**
 +     * Take a snapshot of the specific column family, or the entire set of 
column families
 +     * if columnFamily is null with a given timestamp
 +     *
 +     * @param snapshotName     the tag associated with the name of the 
snapshot.  This value may not be null
 +     * @param columnFamilyName the column family to snapshot or all on null
 +     * @throws IOException if the column family doesn't exist
 +     */
 +    public void snapshot(String snapshotName, String columnFamilyName) throws 
IOException
 +    {
 +        assert snapshotName != null;
 +        boolean tookSnapShot = false;
 +        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
 +        {
 +            if (columnFamilyName == null || 
cfStore.name.equals(columnFamilyName))
 +            {
 +                tookSnapShot = true;
 +                cfStore.snapshot(snapshotName);
 +            }
 +        }
 +
 +        if ((columnFamilyName != null) && !tookSnapShot)
 +            throw new IOException("Failed taking snapshot. Column family " + 
columnFamilyName + " does not exist.");
 +    }
 +
 +    /**
 +     * @param clientSuppliedName may be null.
 +     * @return the name of the snapshot
 +     */
 +    public static String getTimestampedSnapshotName(String clientSuppliedName)
 +    {
 +        String snapshotName = Long.toString(System.currentTimeMillis());
 +        if (clientSuppliedName != null && !clientSuppliedName.equals(""))
 +        {
 +            snapshotName = snapshotName + "-" + clientSuppliedName;
 +        }
 +        return snapshotName;
 +    }
 +
 +    /**
 +     * Check whether snapshots already exists for a given name.
 +     *
 +     * @param snapshotName the user supplied snapshot name
 +     * @return true if the snapshot exists
 +     */
 +    public boolean snapshotExists(String snapshotName)
 +    {
 +        assert snapshotName != null;
 +        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
 +        {
 +            if (cfStore.snapshotExists(snapshotName))
 +                return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Clear all the snapshots for a given keyspace.
 +     *
 +     * @param snapshotName the user supplied snapshot name. It empty or null,
 +     *                     all the snapshots will be cleaned
 +     */
 +    public void clearSnapshot(String snapshotName)
 +    {
 +        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
 +        {
 +            cfStore.clearSnapshot(snapshotName);
 +        }
 +    }
 +
 +    /**
 +     * @return A list of open SSTableReaders
 +     */
 +    public List<SSTableReader> getAllSSTables()
 +    {
 +        List<SSTableReader> list = new 
ArrayList<SSTableReader>(columnFamilyStores.size());
 +        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
 +            list.addAll(cfStore.getSSTables());
 +        return list;
 +    }
 +
 +    private Keyspace(String keyspaceName, boolean loadSSTables)
 +    {
 +        metadata = Schema.instance.getKSMetaData(keyspaceName);
 +        assert metadata != null : "Unknown keyspace " + keyspaceName;
 +        createReplicationStrategy(metadata);
 +
 +        for (CFMetaData cfm : new 
ArrayList<CFMetaData>(metadata.cfMetaData().values()))
 +        {
 +            logger.debug("Initializing {}.{}", getName(), cfm.cfName);
 +            initCf(cfm.cfId, cfm.cfName, loadSSTables);
 +        }
 +    }
 +
 +    public void createReplicationStrategy(KSMetaData ksm)
 +    {
 +        if (replicationStrategy != null)
 +            
StorageService.instance.getTokenMetadata().unregister(replicationStrategy);
 +
 +        replicationStrategy = 
AbstractReplicationStrategy.createReplicationStrategy(ksm.name,
 +                                                                              
      ksm.strategyClass,
 +                                                                              
      StorageService.instance.getTokenMetadata(),
 +                                                                              
      DatabaseDescriptor.getEndpointSnitch(),
 +                                                                              
      ksm.strategyOptions);
 +    }
 +
 +    // best invoked on the compaction mananger.
 +    public void dropCf(UUID cfId)
 +    {
 +        assert columnFamilyStores.containsKey(cfId);
 +        ColumnFamilyStore cfs = columnFamilyStores.remove(cfId);
 +        if (cfs == null)
 +            return;
 +
 +        unloadCf(cfs);
 +    }
 +
 +    // disassociate a cfs from this keyspace instance.
 +    private void unloadCf(ColumnFamilyStore cfs)
 +    {
 +        cfs.forceBlockingFlush();
 +        cfs.invalidate();
 +    }
 +
 +    /**
 +     * adds a cf to internal structures, ends up creating disk files).
 +     */
 +    public void initCf(UUID cfId, String cfName, boolean loadSSTables)
 +    {
 +        ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
 +
 +        if (cfs == null)
 +        {
 +            // CFS being created for the first time, either on server startup 
or new CF being added.
 +            // We don't worry about races here; startup is safe, and adding 
multiple idential CFs
 +            // simultaneously is a "don't do that" scenario.
 +            ColumnFamilyStore oldCfs = columnFamilyStores.putIfAbsent(cfId, 
ColumnFamilyStore.createColumnFamilyStore(this, cfName, loadSSTables));
 +            // CFS mbean instantiation will error out before we hit this, but 
in case that changes...
 +            if (oldCfs != null)
 +                throw new IllegalStateException("added multiple mappings for 
cf id " + cfId);
 +        }
 +        else
 +        {
 +            // re-initializing an existing CF.  This will happen if you 
cleared the schema
 +            // on this node and it's getting repopulated from the rest of the 
cluster.
 +            assert cfs.name.equals(cfName);
 +            cfs.metadata.reload();
 +            cfs.reload();
 +        }
 +    }
 +
 +    public Row getRow(QueryFilter filter)
 +    {
 +        ColumnFamilyStore cfStore = 
getColumnFamilyStore(filter.getColumnFamilyName());
 +        ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
 +        return new Row(filter.key, columnFamily);
 +    }
 +
 +    public void apply(RowMutation mutation, boolean writeCommitLog)
 +    {
 +        apply(mutation, writeCommitLog, true);
 +    }
 +
 +    /**
 +     * This method appends a row to the global CommitLog, then updates 
memtables and indexes.
 +     *
 +     * @param mutation       the row to write.  Must not be modified after 
calling apply, since commitlog append
 +     *                       may happen concurrently, depending on the CL 
Executor type.
 +     * @param writeCommitLog false to disable commitlog append entirely
 +     * @param updateIndexes  false to disable index updates (used by 
CollationController "defragmenting")
 +     */
 +    public void apply(RowMutation mutation, boolean writeCommitLog, boolean 
updateIndexes)
 +    {
 +        // write the mutation to the commitlog and memtables
 +        Tracing.trace("Acquiring switchLock read lock");
 +        switchLock.readLock().lock();
 +        try
 +        {
 +            if (writeCommitLog)
 +            {
 +                Tracing.trace("Appending to commitlog");
 +                CommitLog.instance.add(mutation);
 +            }
 +
 +            DecoratedKey key = 
StorageService.getPartitioner().decorateKey(mutation.key());
 +            for (ColumnFamily cf : mutation.getColumnFamilies())
 +            {
 +                ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
 +                if (cfs == null)
 +                {
 +                    logger.error("Attempting to mutate non-existant column 
family " + cf.id());
 +                    continue;
 +                }
 +
 +                Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
-                 cfs.apply(key, cf, updateIndexes ? 
cfs.indexManager.updaterFor(key) : SecondaryIndexManager.nullUpdater);
++                cfs.apply(key, cf, updateIndexes ? 
cfs.indexManager.updaterFor(key, cf) : SecondaryIndexManager.nullUpdater);
 +            }
 +        }
 +        finally
 +        {
 +            switchLock.readLock().unlock();
 +        }
 +    }
 +
 +    public AbstractReplicationStrategy getReplicationStrategy()
 +    {
 +        return replicationStrategy;
 +    }
 +
 +    /**
 +     * @param key row to index
 +     * @param cfs ColumnFamily to index row in
 +     * @param idxNames columns to index, in comparator order
 +     */
 +    public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, 
Set<String> idxNames)
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Indexing row {} ", 
cfs.metadata.getKeyValidator().getString(key.key));
 +
 +        Collection<SecondaryIndex> indexes = 
cfs.indexManager.getIndexesByNames(idxNames);
 +
 +        switchLock.readLock().lock();
 +        try
 +        {
 +            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, 
key.key, DEFAULT_PAGE_SIZE);
 +            while (pager.hasNext())
 +            {
 +                ColumnFamily cf = pager.next();
 +                ColumnFamily cf2 = cf.cloneMeShallow();
 +                for (Column column : cf)
 +                {
 +                    if (cfs.indexManager.indexes(column.name(), indexes))
 +                        cf2.addColumn(column);
 +                }
 +                cfs.indexManager.indexRow(key.key, cf2);
 +            }
 +        }
 +        finally
 +        {
 +            switchLock.readLock().unlock();
 +        }
 +    }
 +
 +    public List<Future<?>> flush()
 +    {
 +        List<Future<?>> futures = new 
ArrayList<Future<?>>(columnFamilyStores.size());
 +        for (UUID cfId : columnFamilyStores.keySet())
 +            futures.add(columnFamilyStores.get(cfId).forceFlush());
 +        return futures;
 +    }
 +
 +    public static Iterable<Keyspace> all()
 +    {
 +        return Iterables.transform(Schema.instance.getKeyspaces(), 
keyspaceTransformer);
 +    }
 +
 +    public static Iterable<Keyspace> nonSystem()
 +    {
 +        return Iterables.transform(Schema.instance.getNonSystemKeyspaces(), 
keyspaceTransformer);
 +    }
 +
 +    public static Iterable<Keyspace> system()
 +    {
 +        return Iterables.transform(Schema.systemKeyspaceNames, 
keyspaceTransformer);
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return getClass().getSimpleName() + "(name='" + getName() + "')";
 +    }
 +
 +    public String getName()
 +    {
 +        return metadata.name;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 67ab3e3,52d2152..eff9537
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@@ -77,8 -78,8 +77,8 @@@ public class SecondaryIndexManage
  
      public SecondaryIndexManager(ColumnFamilyStore baseCfs)
      {
--        indexesByColumn = new ConcurrentSkipListMap<ByteBuffer, 
SecondaryIndex>();
--        rowLevelIndexMap = new HashMap<Class<? extends SecondaryIndex>, 
SecondaryIndex>();
++        indexesByColumn = new ConcurrentSkipListMap<>();
++        rowLevelIndexMap = new HashMap<>();
  
          this.baseCfs = baseCfs;
      }
@@@ -106,15 -106,15 +106,13 @@@
  
          Set<SecondaryIndex> reloadedIndexes = Collections.newSetFromMap(new 
IdentityHashMap<SecondaryIndex, Boolean>());
          for (SecondaryIndex index : indexesByColumn.values())
--        {
              if (reloadedIndexes.add(index))
                  index.reload();
--        }
      }
  
      public Set<String> allIndexesNames()
      {
-         Set<String> names = new HashSet<String>(indexesByColumn.size());
 -        Set<String> names = new HashSet<String>();
++        Set<String> names = new HashSet<>(indexesByColumn.size());
          for (SecondaryIndex index : indexesByColumn.values())
              names.add(index.getIndexName());
          return names;
@@@ -157,16 -167,12 +155,16 @@@
          for (SecondaryIndex index : indexes)
          {
              if (index.indexes(name))
 -                return index;
 +            {
 +                if (matching == null)
-                     matching = new ArrayList<SecondaryIndex>();
++                    matching = new ArrayList<>();
 +                matching.add(index);
 +            }
          }
 -        return null;
 +        return matching == null ? Collections.<SecondaryIndex>emptyList() : 
matching;
      }
  
 -    public boolean indexes(IColumn column)
 +    public boolean indexes(Column column)
      {
          return indexes(column.name());
      }
@@@ -198,6 -204,6 +196,7 @@@
          for (SecondaryIndexSearcher searcher : searchers)
              if (!searcher.isIndexing(clause))
                  return false;
++
          return true;
      }
  
@@@ -323,16 -334,16 +322,14 @@@
       */
      public List<String> getBuiltIndexes()
      {
--        List<String> indexList = new ArrayList<String>();
++        List<String> indexList = new ArrayList<>();
  
          for (Map.Entry<ByteBuffer, SecondaryIndex> entry : 
indexesByColumn.entrySet())
          {
              SecondaryIndex index = entry.getValue();
  
              if (index.isIndexBuilt(entry.getKey()))
--            {
                  indexList.add(entry.getValue().getIndexName());
--            }
          }
  
          return indexList;
@@@ -343,7 -364,7 +340,7 @@@
       */
      public Collection<ColumnFamilyStore> getIndexesBackedByCfs()
      {
--        ArrayList<ColumnFamilyStore> cfsList = new 
ArrayList<ColumnFamilyStore>();
++        ArrayList<ColumnFamilyStore> cfsList = new ArrayList<>();
  
          for (SecondaryIndex index: indexesByColumn.values())
          {
@@@ -406,18 -427,18 +403,16 @@@
              if (index instanceof PerRowSecondaryIndex)
              {
                  if (appliedRowLevelIndexes == null)
--                    appliedRowLevelIndexes = new HashSet<Class<? extends 
SecondaryIndex>>();
++                    appliedRowLevelIndexes = new HashSet<>();
  
                  if (appliedRowLevelIndexes.add(index.getClass()))
                      ((PerRowSecondaryIndex)index).index(key, cf);
              }
              else
              {
 -                for (IColumn column : cf)
 -                {
 +                for (Column column : cf)
-                 {
                      if (index.indexes(column.name()))
                          ((PerColumnSecondaryIndex) index).insert(key, column);
--                }
              }
          }
      }
@@@ -442,7 -463,7 +437,7 @@@
              if (index instanceof PerRowSecondaryIndex)
              {
                  if (cleanedRowLevelIndexes == null)
--                    cleanedRowLevelIndexes = new HashSet<Class<? extends 
SecondaryIndex>>();
++                    cleanedRowLevelIndexes = new HashSet<>();
  
                  if (cleanedRowLevelIndexes.add(index.getClass()))
                      ((PerRowSecondaryIndex)index).delete(key);
@@@ -474,7 -504,7 +478,7 @@@
       */
      private List<SecondaryIndexSearcher> 
getIndexSearchersForQuery(List<IndexExpression> clause)
      {
--        Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<String, 
Set<ByteBuffer>>();
++        Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<>();
  
          //Group columns by type
          for (IndexExpression ix : clause)
@@@ -488,14 -518,14 +492,14 @@@
  
              if (columns == null)
              {
--                columns = new HashSet<ByteBuffer>();
++                columns = new HashSet<>();
                  groupByIndexType.put(index.getClass().getCanonicalName(), 
columns);
              }
  
              columns.add(ix.column_name);
          }
  
--        List<SecondaryIndexSearcher> indexSearchers = new 
ArrayList<SecondaryIndexSearcher>(groupByIndexType.size());
++        List<SecondaryIndexSearcher> indexSearchers = new 
ArrayList<>(groupByIndexType.size());
  
          //create searcher per type
          for (Set<ByteBuffer> column : groupByIndexType.values())
@@@ -527,12 -560,12 +531,10 @@@
  
      public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
      {
--        List<SecondaryIndex> result = new ArrayList<SecondaryIndex>();
++        List<SecondaryIndex> result = new ArrayList<>();
          for (SecondaryIndex index : indexesByColumn.values())
--        {
              if (idxNames.contains(index.getIndexName()))
                  result.add(index);
--        }
          return result;
      }
  
@@@ -550,8 -583,8 +552,8 @@@
  
      public boolean validate(Column column)
      {
 -        SecondaryIndex index = getIndexForColumn(column.name);
 +        SecondaryIndex index = getIndexForColumn(column.name());
-         return index != null ? index.validate(column) : true;
+         return index == null || index.validate(column);
      }
  
      public static interface Updater
@@@ -572,25 -605,28 +574,25 @@@
      private class StandardUpdater implements Updater
      {
          private final DecoratedKey key;
+         private final ColumnFamily cf;
  
-         public StandardUpdater(DecoratedKey key)
+         public StandardUpdater(DecoratedKey key, ColumnFamily cf)
          {
              this.key = key;
+             this.cf = cf;
          }
  
 -        public void insert(IColumn column)
 +        public void insert(Column column)
          {
 -            if (column.isMarkedForDelete())
 +            if (column.isMarkedForDelete(System.currentTimeMillis()))
                  return;
  
 -            SecondaryIndex index = indexFor(column.name());
 -            if (index == null)
 -                return;
 -
 -            if (index instanceof PerColumnSecondaryIndex)
 -                ((PerColumnSecondaryIndex) index).insert(key.key, column);
 +            for (SecondaryIndex index : indexFor(column.name()))
-             {
 +                if (index instanceof PerColumnSecondaryIndex)
 +                    ((PerColumnSecondaryIndex) index).insert(key.key, column);
-             }
          }
  
 -        public void update(IColumn oldColumn, IColumn column)
 +        public void update(Column oldColumn, Column column)
          {
              if (oldColumn.equals(column))
                  return;
@@@ -608,16 -645,17 +610,14 @@@
              }
          }
  
 -        public void remove(IColumn column)
 +        public void remove(Column column)
          {
 -            if (column.isMarkedForDelete())
 +            if (column.isMarkedForDelete(System.currentTimeMillis()))
                  return;
  
 -            SecondaryIndex index = indexFor(column.name());
 -            if (index == null)
 -                return;
 -
 -            if (index instanceof PerColumnSecondaryIndex)
 -                ((PerColumnSecondaryIndex) index).delete(key.key, column);
 +            for (SecondaryIndex index : indexFor(column.name()))
-             {
 +                if (index instanceof PerColumnSecondaryIndex)
 +                   ((PerColumnSecondaryIndex) index).delete(key.key, column);
-             }
          }
  
          public void updateRowLevelIndexes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
index 3a10b80,3281a5a..51d9ee8
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexColumnSizeTest.java
@@@ -47,10 -51,10 +47,10 @@@ public class SecondaryIndexColumnSizeTe
  
          // for read
          buffer.flip();
 -        column.value = buffer;
 +        Column column = new Column(ByteBufferUtil.bytes("test"), buffer, 0);
  
-         MockRowIndex mockRowIndex = new MockRowIndex();
-         MockColumnIndex mockColumnIndex = new MockColumnIndex();
+         SecondaryIndexColumnSizeTest.MockRowIndex mockRowIndex = new 
SecondaryIndexColumnSizeTest.MockRowIndex();
+         SecondaryIndexColumnSizeTest.MockColumnIndex mockColumnIndex = new 
SecondaryIndexColumnSizeTest.MockColumnIndex();
  
          assertTrue(mockRowIndex.validate(column));
          assertFalse(mockColumnIndex.validate(column));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6fda9c6/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
index 3e9d3a1,c720d21..b983e6e
--- a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
@@@ -58,22 -59,22 +58,22 @@@ public class PerRowSecondaryIndexTest e
          // create a row then test that the configured index instance was able 
to read the row
          RowMutation rm;
          rm = new RowMutation("PerRowSecondaryIndex", 
ByteBufferUtil.bytes("k1"));
 -        rm.add(new QueryPath("Indexed1", null, 
ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("foo"), 1);
 +        rm.add("Indexed1", ByteBufferUtil.bytes("indexed"), 
ByteBufferUtil.bytes("foo"), 1);
          rm.apply();
  
-         ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+         ColumnFamily indexedRow = 
PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW;
          assertNotNull(indexedRow);
          assertEquals(ByteBufferUtil.bytes("foo"), 
indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value());
  
          // update the row and verify what was indexed
          rm = new RowMutation("PerRowSecondaryIndex", 
ByteBufferUtil.bytes("k1"));
 -        rm.add(new QueryPath("Indexed1", null, 
ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("bar"), 2);
 +        rm.add("Indexed1", ByteBufferUtil.bytes("indexed"), 
ByteBufferUtil.bytes("bar"), 2);
          rm.apply();
  
-         indexedRow = TestIndex.LAST_INDEXED_ROW;
+         indexedRow = PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW;
          assertNotNull(indexedRow);
          assertEquals(ByteBufferUtil.bytes("bar"), 
indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value());
-         assertTrue(Arrays.equals("k1".getBytes(), 
TestIndex.LAST_INDEXED_KEY.array()));
+         assertTrue(Arrays.equals("k1".getBytes(), 
PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_KEY.array()));
      }
  
      @Test
@@@ -82,17 -83,17 +82,17 @@@
          // issue a column delete and test that the configured index instance 
was notified to update
          RowMutation rm;
          rm = new RowMutation("PerRowSecondaryIndex", 
ByteBufferUtil.bytes("k2"));
 -        rm.delete(new QueryPath("Indexed1", null, 
ByteBufferUtil.bytes("indexed")), 1);
 +        rm.delete("Indexed1", ByteBufferUtil.bytes("indexed"), 1);
          rm.apply();
  
-         ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+         ColumnFamily indexedRow = 
PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW;
          assertNotNull(indexedRow);
  
 -        for (IColumn column : indexedRow.getSortedColumns())
 +        for (Column column : indexedRow.getSortedColumns())
          {
 -            assertTrue(column.isMarkedForDelete());
 +            assertTrue(column.isMarkedForDelete(System.currentTimeMillis()));
          }
-         assertTrue(Arrays.equals("k2".getBytes(), 
TestIndex.LAST_INDEXED_KEY.array()));
+         assertTrue(Arrays.equals("k2".getBytes(), 
PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_KEY.array()));
      }
  
      @Test
@@@ -101,16 -102,16 +101,16 @@@
          // issue a row level delete and test that the configured index 
instance was notified to update
          RowMutation rm;
          rm = new RowMutation("PerRowSecondaryIndex", 
ByteBufferUtil.bytes("k3"));
 -        rm.delete(new QueryPath("Indexed1"), 1);
 +        rm.delete("Indexed1", 1);
          rm.apply();
  
-         ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW;
+         ColumnFamily indexedRow = 
PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_ROW;
          assertNotNull(indexedRow);
 -        for (IColumn column : indexedRow.getSortedColumns())
 +        for (Column column : indexedRow.getSortedColumns())
          {
 -            assertTrue(column.isMarkedForDelete());
 +            assertTrue(column.isMarkedForDelete(System.currentTimeMillis()));
          }
-         assertTrue(Arrays.equals("k3".getBytes(), 
TestIndex.LAST_INDEXED_KEY.array()));
+         assertTrue(Arrays.equals("k3".getBytes(), 
PerRowSecondaryIndexTest.TestIndex.LAST_INDEXED_KEY.array()));
      }
  
      public static class TestIndex extends PerRowSecondaryIndex
@@@ -127,14 -128,8 +127,9 @@@
          @Override
          public void index(ByteBuffer rowKey, ColumnFamily cf)
          {
-         }
- 
-         @Override
-         public void index(ByteBuffer rowKey)
-         {
              QueryFilter filter = 
QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey),
 -                                                               new 
QueryPath(baseCfs.getColumnFamilyName()));
 +                                                               
baseCfs.getColumnFamilyName(),
 +                                                               
System.currentTimeMillis());
              LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter);
              LAST_INDEXED_KEY = rowKey;
          }

Reply via email to