Updated Branches: refs/heads/cassandra-1.2 a19a15a2b -> 6a4af0c77 refs/heads/trunk a7b2ff65a -> 76c8fe467
Ensure that PerRowSecondaryIndex updates see the most recentvalues patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5397 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6a4af0c7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6a4af0c7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6a4af0c7 Branch: refs/heads/cassandra-1.2 Commit: 6a4af0c77ac0aa3600e0a778497e6856d3e356cb Parents: a19a15a Author: Jonathan Ellis <[email protected]> Authored: Thu Apr 4 10:55:17 2013 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Apr 4 13:23:35 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/AtomicSortedColumns.java | 2 + .../cassandra/db/index/SecondaryIndexManager.java | 49 +++-- test/unit/org/apache/cassandra/SchemaLoader.java | 29 +++ .../db/index/PerRowSecondaryIndexTest.java | 151 +++++++++++++++ 5 files changed, 217 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c1e2540..e64358f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.2.4 + * Ensure that PerRowSecondaryIndex updates see the most recent values + (CASSANDRA-5397) * avoid duplicate index entries ind PrecompactedRow and ParallelCompactionIterable (CASSANDRA-5395) * remove the index entry on oldColumn when new column is a tombstone http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index 83aabea..552ad6a 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -195,6 +195,8 @@ public class AtomicSortedColumns implements ISortedColumns } while (!ref.compareAndSet(current, modified)); + indexer.commit(); + return sizeDelta; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 83374d9..df7ceff 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -19,21 +19,18 @@ package org.apache.cassandra.db.index; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.*; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.thrift.Column; @@ -54,6 +51,8 @@ public class SecondaryIndexManager public void update(IColumn oldColumn, IColumn column) { } public void remove(IColumn current) { } + + public void commit() {} }; /** @@ -580,11 +579,17 @@ public class SecondaryIndexManager public static interface Updater { + /** called when constructing the index against pre-existing data */ public void insert(IColumn column); + /** called when updating the index from a memtable */ public void update(IColumn oldColumn, IColumn column); + /** called when lazy-updating the index during compaction (CASSANDRA-2897) */ public void remove(IColumn current); + + /** called after memtable updates are complete (CASSANDRA-5397) */ + public void commit(); } private class PerColumnIndexUpdater implements Updater @@ -630,12 +635,17 @@ public class SecondaryIndexManager ((PerColumnSecondaryIndex) index).delete(key.key, column); } + + public void commit() + { + // this is a no-op as per-column index updates are applied immediately + } } private class MixedIndexUpdater implements Updater { private final DecoratedKey key; - Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = new HashSet<Class<? extends SecondaryIndex>>(); + ConcurrentHashMap<SecondaryIndex, ByteBuffer> deferredUpdates = new ConcurrentHashMap<SecondaryIndex, ByteBuffer>(); public MixedIndexUpdater(DecoratedKey key) { @@ -651,14 +661,13 @@ public class SecondaryIndexManager if (index == null) return; - if (index instanceof PerColumnSecondaryIndex) + if (index instanceof PerColumnSecondaryIndex) { ((PerColumnSecondaryIndex) index).insert(key.key, column); } else { - if (appliedRowLevelIndexes.add(index.getClass())) - ((PerRowSecondaryIndex) index).index(key.key); + deferredUpdates.putIfAbsent(index, key.key); } } @@ -668,7 +677,7 @@ public class SecondaryIndexManager if (index == null) return; - if (index instanceof PerColumnSecondaryIndex) + if (index instanceof PerColumnSecondaryIndex) { ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn); if (!column.isMarkedForDelete()) @@ -676,8 +685,7 @@ public class SecondaryIndexManager } else { - if (appliedRowLevelIndexes.add(index.getClass())) - ((PerRowSecondaryIndex) index).index(key.key); + deferredUpdates.putIfAbsent(index, key.key); } } @@ -690,14 +698,23 @@ public class SecondaryIndexManager if (index == null) return; - if (index instanceof PerColumnSecondaryIndex) + if (index instanceof PerColumnSecondaryIndex) { ((PerColumnSecondaryIndex) index).delete(key.key, column); } else { - if (appliedRowLevelIndexes.add(index.getClass())) - ((PerRowSecondaryIndex) index).index(key.key); + // per-row secondary indexes are assumed to keep the index up-to-date at insert time, rather + // than performing lazy updates + } + } + + public void commit() + { + for (Map.Entry<SecondaryIndex, ByteBuffer> update : deferredUpdates.entrySet()) + { + assert update.getKey() instanceof PerRowSecondaryIndex; + ((PerRowSecondaryIndex) update.getKey()).index(update.getValue()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 48fbc04..cb17665 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -24,6 +24,8 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Charsets; +import org.apache.cassandra.db.index.PerRowSecondaryIndexTest; +import org.apache.cassandra.db.index.SecondaryIndex; import org.junit.AfterClass; import org.junit.BeforeClass; import org.slf4j.Logger; @@ -113,6 +115,8 @@ public class SchemaLoader String ks_kcs = "KeyCacheSpace"; String ks_rcs = "RowCacheSpace"; String ks_nocommit = "NoCommitlogSpace"; + String ks_prsi = "PerRowSecondaryIndex"; + Class<? extends AbstractReplicationStrategy> simple = SimpleStrategy.class; @@ -290,6 +294,12 @@ public class SchemaLoader opts_rf1, standardCFMD(ks_nocommit, "Standard1", withOldCfIds))); + // PerRowSecondaryIndexTest + schema.add(KSMetaData.testMetadata(ks_prsi, + simple, + opts_rf1, + perRowIndexedCFMD(ks_prsi, "Indexed1", withOldCfIds))); + if (Boolean.parseBoolean(System.getProperty("cassandra.test.compression", "false"))) useCompression(schema); @@ -297,6 +307,25 @@ public class SchemaLoader return schema; } + private static CFMetaData perRowIndexedCFMD(String ksName, String cfName, boolean withOldCfIds) + { + final Map<String, String> indexOptions = Collections.singletonMap( + SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, + PerRowSecondaryIndexTest.TestIndex.class.getName()); + return standardCFMD(ksName, cfName, withOldCfIds) + .keyValidator(AsciiType.instance) + .columnMetadata(new HashMap<ByteBuffer, ColumnDefinition>() + {{ + ByteBuffer cName = ByteBuffer.wrap("indexed".getBytes(Charsets.UTF_8)); + put(cName, new ColumnDefinition(cName, + AsciiType.instance, + IndexType.CUSTOM, + indexOptions, + ByteBufferUtil.bytesToHex(cName), + null)); + }}); + } + private static void useCompression(List<KSMetaData> schema) { for (KSMetaData ksm : schema) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6a4af0c7/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java new file mode 100644 index 0000000..3a4f947 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.index; + + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.utils.ByteBufferUtil; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Set; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; + +public class PerRowSecondaryIndexTest extends SchemaLoader +{ + + // test that when index(key) is called on a PRSI index, + // the data to be indexed can be read using the supplied + // key. TestIndex.index(key) simply reads the data to be + // indexed & stashes it in a static variable for inspection + // in the test. + + @Test + public void testIndexInsertAndUpdate() throws IOException + { + // create a row then test that the configured index instance was able to read the row + RowMutation rm; + rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k1")); + rm.add(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("foo"), 1); + rm.apply(); + + ColumnFamily indexedRow = TestIndex.LAST_INDEXED_ROW; + assertNotNull(indexedRow); + assertEquals(ByteBufferUtil.bytes("foo"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value()); + + // update the row and verify what was indexed + rm = new RowMutation("PerRowSecondaryIndex", ByteBufferUtil.bytes("k1")); + rm.add(new QueryPath("Indexed1", null, ByteBufferUtil.bytes("indexed")), ByteBufferUtil.bytes("bar"), 2); + rm.apply(); + + indexedRow = TestIndex.LAST_INDEXED_ROW; + assertNotNull(indexedRow); + assertEquals(ByteBufferUtil.bytes("bar"), indexedRow.getColumn(ByteBufferUtil.bytes("indexed")).value()); + } + + public static class TestIndex extends PerRowSecondaryIndex + { + public static ColumnFamily LAST_INDEXED_ROW; + + @Override + public void index(ByteBuffer rowKey, ColumnFamily cf) + { + } + + @Override + public void index(ByteBuffer rowKey) + { + QueryFilter filter = QueryFilter.getIdentityFilter(DatabaseDescriptor.getPartitioner().decorateKey(rowKey), + new QueryPath(baseCfs.getColumnFamilyName())); + LAST_INDEXED_ROW = baseCfs.getColumnFamily(filter); + } + + @Override + public void delete(DecoratedKey key) + { + } + + @Override + public void init() + { + } + + @Override + public void reload() + { + } + + @Override + public void validateOptions() throws ConfigurationException + { + } + + @Override + public String getIndexName() + { + return null; + } + + @Override + protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns) + { + return null; + } + + @Override + public void forceBlockingFlush() + { + } + + @Override + public long getLiveSize() + { + return 0; + } + + @Override + public ColumnFamilyStore getIndexCfs() + { + return null; + } + + @Override + public void removeIndex(ByteBuffer columnName) + { + } + + @Override + public void invalidate() + { + } + + @Override + public void truncate(long truncatedAt) + { + } + } +}
