Repository: ignite Updated Branches: refs/heads/ignite-5937 a5a8fba32 -> d381cdec8
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d381cdec Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d381cdec Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d381cdec Branch: refs/heads/ignite-5937 Commit: d381cdec8a4fa974bb417780b45428a540845d50 Parents: a5a8fba Author: sboikov <[email protected]> Authored: Tue Oct 24 11:41:30 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 24 16:00:46 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 89 +++++++++--- .../cache/persistence/CacheDataRowAdapter.java | 6 +- .../cache/tree/CacheDataRowStore.java | 6 +- .../internal/processors/cache/tree/DataRow.java | 17 ++- .../processors/cache/tree/MvccDataRow.java | 17 ++- .../processors/cache/tree/MvccUpdateRow.java | 5 +- .../cache/mvcc/CacheMvccAbstractTest.java | 65 +++++++++ .../cache/mvcc/CacheMvccTransactionsTest.java | 78 ++++++++++- .../processors/database/BPlusTreeSelfTest.java | 106 ++++++++++++-- .../processors/query/h2/IgniteH2Indexing.java | 3 +- .../query/h2/database/H2RowFactory.java | 8 +- .../query/h2/database/H2TreeIndex.java | 2 +- .../cache/mvcc/CacheMvccSqlQueriesTest.java | 140 +++++++++++++++++++ 13 files changed, 479 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 5d0d51d..8a18751 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -1551,8 +1551,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager incrementSize(cctx.cacheId()); } - cleanup(cctx, updateRow.cleanupRows(), false); - CacheDataRow oldRow = updateRow.oldRow(); if (oldRow != null) @@ -1563,6 +1561,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (qryMgr.enabled()) qryMgr.store(updateRow, mvccVer, oldRow); + updatePendingEntries(cctx, updateRow, oldRow); + + cleanup(cctx, updateRow.cleanupRows(), false); + return updateRow.activeTransactions(); } finally { @@ -1641,6 +1643,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (qryMgr.enabled()) qryMgr.remove(key, oldRow, mvccVer); + + clearPendingEntries(cctx, oldRow); } return updateRow.activeTransactions(); @@ -1656,26 +1660,40 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + boolean cleanup = cctx.queries().enabled() || hasPendingEntries; + GridCursor<CacheDataRow> cur = dataTree.find( new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE), new MvccSearchRow(cacheId, key, 1, 1), - CacheDataRowAdapter.RowData.KEY_ONLY); + cleanup ? CacheDataRowAdapter.RowData.NO_KEY : CacheDataRowAdapter.RowData.LINK_ONLY); boolean first = true; while (cur.next()) { CacheDataRow row = cur.get(); + row.key(key); + assert row.link() != 0 : row; boolean rmvd = dataTree.removex(row); - assert rmvd; + assert rmvd : row; + + boolean rmvdVal = versionForRemovedValue(row.mvccCoordinatorVersion()); + + if (cleanup && !rmvdVal) { + if (cctx.queries().enabled()) + cctx.queries().remove(key, row, null); + + if (first) + clearPendingEntries(cctx, row); + } rowStore.removeRow(row.link()); if (first) { - if (!versionForRemovedValue(row.mvccCoordinatorVersion())) + if (!rmvdVal) decrementSize(cctx.cacheId()); first = false; @@ -1684,6 +1702,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** + * @param cctx Cache context. * @param cleanupRows Rows to cleanup. * @param findRmv {@code True} if need keep removed row entry. * @return Removed row link of {@code 0} if not found. @@ -1797,32 +1816,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager KeyCacheObject key = newRow.key(); - long expireTime = newRow.expireTime(); - GridCacheQueryManager qryMgr = cctx.queries(); - int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - if (qryMgr.enabled()) qryMgr.store(newRow, null, oldRow); + updatePendingEntries(cctx, newRow, oldRow); + if (oldRow != null) { assert oldRow.link() != 0 : oldRow; - if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); - if (newRow.link() != oldRow.link()) rowStore.removeRow(oldRow.link()); } + updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value()); + } + + /** + * @param cctx Cache context. + * @param newRow + * @param oldRow + * @throws IgniteCheckedException If failed. + */ + private void updatePendingEntries(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow) + throws IgniteCheckedException + { + long expireTime = newRow.expireTime(); + + int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; + + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + } + if (pendingEntries != null && expireTime != 0) { pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link())); hasPendingEntries = true; } - - updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value()); } /** {@inheritDoc} */ @@ -1865,14 +1900,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager */ private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { if (oldRow != null) { - int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; - - assert oldRow.link() != 0 : oldRow; - assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId : - "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "]."; - - if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + clearPendingEntries(cctx, oldRow); decrementSize(cctx.cacheId()); } @@ -1888,6 +1916,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null); } + /** + * @param cctx + * @param oldRow + * @throws IgniteCheckedException + */ + private void clearPendingEntries(GridCacheContext cctx, CacheDataRow oldRow) + throws IgniteCheckedException { + int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + + assert oldRow.link() != 0 : oldRow; + assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId : + "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "]."; + + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + } + /** {@inheritDoc} */ @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { key.valueBytes(cctx.cacheObjectContext()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java index d0f2dab..d912629 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java @@ -59,6 +59,7 @@ public class CacheDataRowAdapter implements CacheDataRow { protected CacheObject val; /** */ + @GridToStringInclude protected long expireTime = -1; /** */ @@ -598,7 +599,10 @@ public class CacheDataRowAdapter implements CacheDataRow { KEY_ONLY, /** */ - NO_KEY + NO_KEY, + + /** */ + LINK_ONLY, } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java index 85624d5..5537794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java @@ -73,9 +73,9 @@ public class CacheDataRowStore extends RowStore { * @return Search row. */ MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) { - if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) { - if (rowData == CacheDataRowAdapter.RowData.NO_KEY) - return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr); + if (versionForRemovedValue(crdVer)) { + if (rowData == CacheDataRowAdapter.RowData.NO_KEY || rowData == CacheDataRowAdapter.RowData.LINK_ONLY) + return MvccDataRow.removedRowNoKey(link, partId, cacheId, crdVer, mvccCntr); else rowData = CacheDataRowAdapter.RowData.KEY_ONLY; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java index d1e90d4..8853d6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java @@ -50,15 +50,13 @@ public class DataRow extends CacheDataRowAdapter { this.part = part; try { - // We can not init data row lazily because underlying buffer can be concurrently cleared. - initFromLink(grp, rowData); + // We can not init data row lazily outside of entry lock because underlying buffer can be concurrently cleared. + if (rowData != RowData.LINK_ONLY) + initFromLink(grp, rowData); } catch (IgniteCheckedException e) { throw new IgniteException(e); } - - if (key != null) - key.partition(part); } /** @@ -84,11 +82,18 @@ public class DataRow extends CacheDataRowAdapter { /** * */ - protected DataRow() { + DataRow() { super(0); } /** {@inheritDoc} */ + @Override public void key(KeyCacheObject key) { + super.key(key); + + hash = key.hashCode(); + } + + /** {@inheritDoc} */ @Override public int partition() { return part; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java index 175cf72..a2cf079 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java @@ -33,6 +33,13 @@ public class MvccDataRow extends DataRow { private long mvccCntr; /** + * + */ + private MvccDataRow() { + // No-op. + } + + /** * @param grp Context. * @param hash Key hash. * @param link Link. @@ -51,13 +58,7 @@ public class MvccDataRow extends DataRow { } /** - * - */ - private MvccDataRow() { - // No-op. - } - - /** + * @param link Link. * @param part Partition. * @param cacheId Cache ID. * @param crdVer Mvcc coordinator version. @@ -65,12 +66,14 @@ public class MvccDataRow extends DataRow { * @return Row. */ static MvccDataRow removedRowNoKey( + long link, int part, int cacheId, long crdVer, long mvccCntr) { MvccDataRow row = new MvccDataRow(); + row.link = link; row.cacheId = cacheId; row.part = part; row.crdVer = crdVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java index 90de16f..0b37a94 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java @@ -66,7 +66,9 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C * @param key Key. * @param val Value. * @param ver Version. + * @param expireTime Expire time. * @param mvccVer Mvcc version. + * @param needOld {@code True} if need previous value. * @param part Partition. * @param cacheId Cache ID. */ @@ -175,9 +177,6 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C if (needOld) oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY); } - - res = versionForRemovedValue(rowCrdVerMasked) ? - UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java index ced6dfe..1949cd2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java @@ -161,6 +161,71 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { } /** + * @param cfgC Optional closure applied to cache configuration. + * @throws Exception If failed. + */ + final void cacheRecreate(@Nullable IgniteInClosure<CacheConfiguration> cfgC) throws Exception { + Ignite srv0 = startGrid(0); + + final int PARTS = 64; + + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS); + + if (cfgC != null) + cfgC.apply(ccfg); + + IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg); + + for (int k = 0; k < PARTS * 2; k++) { + assertNull(cache.get(k)); + + int vals = k % 3 + 1; + + for (int v = 0; v < vals; v++) + cache.put(k, new MvccTestAccount(v, 1)); + + assertEquals(vals - 1, cache.get(k).val); + } + + srv0.destroyCache(cache.getName()); + + ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS); + + if (cfgC != null) + cfgC.apply(ccfg); + + cache = (IgniteCache)srv0.createCache(ccfg); + + for (int k = 0; k < PARTS * 2; k++) { + assertNull(cache.get(k)); + + int vals = k % 3 + 2; + + for (int v = 0; v < vals; v++) + cache.put(k, new MvccTestAccount(v + 100, 1)); + + assertEquals(vals - 1 + 100, cache.get(k).val); + } + + srv0.destroyCache(cache.getName()); + + ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS); + + IgniteCache<Long, Long> cache0 = (IgniteCache)srv0.createCache(ccfg); + + for (long k = 0; k < PARTS * 2; k++) { + assertNull(cache0.get(k)); + + int vals = (int)(k % 3 + 2); + + for (long v = 0; v < vals; v++) + cache0.put(k, v); + + assertEquals((long)(vals - 1), (Object)cache0.get(k)); + } + } + + /** * @param srvs Number of server nodes. * @param clients Number of client nodes. * @param cacheBackups Number of cache backups. http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index dbe4ce5..df9f21e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -36,26 +36,24 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.cache.expiry.Duration; +import javax.cache.expiry.TouchedExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; @@ -275,6 +273,13 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testCacheRecreate() throws Exception { + cacheRecreate(null); + } + + /** + * @throws Exception If failed. + */ public void testActiveQueriesCleanup() throws Exception { activeQueriesCleanup(false); } @@ -3626,6 +3631,67 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest { checkRow(cctx, row, key0, vers.get(v + 1).get1()); } } + + KeyCacheObject key = cctx.toCacheKeyObject(KEYS); + + cache.put(key, 0); + + cache.remove(key); + + cctx.offheap().mvccRemoveAll((GridCacheMapEntry)cctx.cache().entryEx(key)); + } + + /** + * @throws Exception If failed. + */ + public void testExpiration() throws Exception { + final IgniteEx node = startGrid(0); + + IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64)); + + final IgniteCache expiryCache = + cache.withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.SECONDS, 1))); + + for (int i = 0; i < 10; i++) + expiryCache.put(1, i); + + assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return expiryCache.localPeek(1) == null; + } + }, 5000)); + + for (int i = 0; i < 11; i++) { + if (i % 2 == 0) + expiryCache.put(1, i); + else + expiryCache.remove(1); + } + + assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return expiryCache.localPeek(1) == null; + } + }, 5000)); + + expiryCache.put(1, 1); + + assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + GridCacheContext cctx = node.context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)); + + KeyCacheObject key = cctx.toCacheKeyObject(1); + + return cctx.offheap().read(cctx, key) == null; + } + catch (Exception e) { + fail(); + + return false; + } + } + }, 5000)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java index e2f6b2e..f6ab5ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java @@ -18,11 +18,14 @@ package org.apache.ignite.internal.processors.database; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; @@ -218,6 +221,53 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** * @throws IgniteCheckedException If failed. */ + public void testFindWithClosure() throws IgniteCheckedException { + TestTree tree = createTestTree(true); + TreeMap<Long, Long> map = new TreeMap<>(); + + long size = CNT * CNT; + + for (long i = 1; i <= size; i++) { + tree.put(i); + map.put(i, i); + } + + checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.<Long>emptySet()), null), + Collections.<Long>emptyList().iterator()); + + checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(map.keySet()), null), + map.values().iterator()); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 100; i++) { + Long val = rnd.nextLong(size) + 1; + + checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.singleton(val)), null), + Collections.singleton(val).iterator()); + } + + for (int i = 0; i < 200; i++) { + long vals = rnd.nextLong(size) + 1; + + TreeSet<Long> exp = new TreeSet<>(); + + for (long k = 0; k < vals; k++) + exp.add(rnd.nextLong(size) + 1); + + checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); + + checkCursor(tree.find(0L, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); + + checkCursor(tree.find(0L, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); + + checkCursor(tree.find(null, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator()); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ public void _testBenchInvoke() throws IgniteCheckedException { MAX_PER_PAGE = 10; @@ -625,12 +675,12 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { } /** - * @param tree - * @param lower - * @param upper - * @param exp - * @param expFound - * @throws IgniteCheckedException + * @param tree Tree. + * @param lower Lower bound. + * @param upper Upper bound. + * @param exp Value to find. + * @param expFound {@code True} if value should be found. + * @throws IgniteCheckedException If failed. */ private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound) throws IgniteCheckedException { @@ -641,6 +691,14 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { assertEquals(expFound, c.found); } + /** + * @param tree Tree. + * @param lower Lower bound. + * @param upper Upper bound. + * @param c Closure. + * @param expFound {@code True} if value should be found. + * @throws IgniteCheckedException If failed. + */ private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound) throws IgniteCheckedException { c.found = false; @@ -1307,7 +1365,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testIterateConcurrentPutRemove() throws Exception { - findOneBoundedConcurrentPutRemove(); + iterateConcurrentPutRemove(); } /** @@ -1316,7 +1374,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { public void testIterateConcurrentPutRemove_1() throws Exception { MAX_PER_PAGE = 1; - findOneBoundedConcurrentPutRemove(); + iterateConcurrentPutRemove(); } /** @@ -1325,7 +1383,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { public void testIterateConcurrentPutRemove_5() throws Exception { MAX_PER_PAGE = 5; - findOneBoundedConcurrentPutRemove(); + iterateConcurrentPutRemove(); } /** @@ -1334,13 +1392,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { public void testIteratePutRemove_10() throws Exception { MAX_PER_PAGE = 10; - findOneBoundedConcurrentPutRemove(); + iterateConcurrentPutRemove(); } /** * @throws Exception If failed. */ - private void findOneBoundedConcurrentPutRemove() throws Exception { + private void iterateConcurrentPutRemove() throws Exception { final TestTree tree = createTestTree(true); final int KEYS = 10_000; @@ -1474,7 +1532,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { } /** - * + * @throws Exception If failed. */ public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception { //calculate tree size when split happens @@ -2132,6 +2190,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { /** */ private Long val; + /** {@inheritDoc} */ @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException { @@ -2142,4 +2201,27 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest { return false; } } + + /** + * + */ + static class TestTreeFindFilteredClosure implements BPlusTree.TreeRowClosure<Long, Long> { + /** */ + private final Set<Long> vals; + + /** + * @param vals Values to allow in filter. + */ + TestTreeFindFilteredClosure(Set<Long> vals) { + this.vals = vals; + } + + /** {@inheritDoc} */ + @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx) + throws IgniteCheckedException { + Long val = io.getLookupRow(tree, pageAddr, idx); + + return vals.contains(val); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index ff9c1da..2359ae1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1549,7 +1549,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (partitions == null && twoStepQry.derivedPartitions() != null) { try { partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), args); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { throw new CacheException("Failed to calculate derived partitions: [qry=" + sqlQry + ", params=" + Arrays.deepToString(args) + "]", e); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java index 409c137..e9ec9e6 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java @@ -75,7 +75,13 @@ public class H2RowFactory { * @throws IgniteCheckedException If failed. */ public GridH2Row getMvccRow(long link, long mvccCrdVer, long mvccCntr) throws IgniteCheckedException { - MvccDataRow row = new MvccDataRow(cctx.group(), 0, link, 0, null, mvccCrdVer, mvccCntr); + MvccDataRow row = new MvccDataRow(cctx.group(), + 0, + link, + -1, // TODO IGNITE-3478: get partition from link. + null, + mvccCrdVer, + mvccCntr); return rowDesc.createRow(row, null); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index cdaa5b0..877bd7f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -177,7 +177,7 @@ public class H2TreeIndex extends GridH2IndexBase { GridH2QueryContext qctx = GridH2QueryContext.get(); if (qctx != null) { - IndexingQueryFilter f = threadLocalFilter(); + IndexingQueryFilter f = qctx.filter(); if (f != null) { String cacheName = getTable().cacheName(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d381cdec/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java index 4ba0fcd..9ac7d21 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java @@ -283,6 +283,146 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testCacheRecreate() throws Exception { + cacheRecreate(new InitIndexing(Integer.class, MvccTestAccount.class)); + } + + /** + * @throws Exception If failed. + */ + public void testCacheRecreateChangeIndexedType() throws Exception { + Ignite srv0 = startGrid(0); + + final int PARTS = 64; + + { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS). + setIndexedTypes(Integer.class, MvccTestAccount.class); + + IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg); + + for (int k = 0; k < PARTS * 2; k++) { + assertNull(cache.get(k)); + + int vals = k % 3 + 1; + + for (int v = 0; v < vals; v++) + cache.put(k, new MvccTestAccount(v, 1)); + + assertEquals(vals - 1, cache.get(k).val); + } + + assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestAccount.class, "true")).getAll().size()); + + srv0.destroyCache(cache.getName()); + } + + { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS). + setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class); + + IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(ccfg); + + for (int k = 0; k < PARTS * 2; k++) { + assertNull(cache.get(k)); + + int vals = k % 3 + 1; + + for (int v = 0; v < vals; v++) + cache.put(k, new MvccTestSqlIndexValue(v)); + + assertEquals(vals - 1, cache.get(k).idxVal1); + } + + assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true")).getAll().size()); + + srv0.destroyCache(cache.getName()); + } + + { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS). + setIndexedTypes(Long.class, Long.class); + + IgniteCache<Long, Long> cache = (IgniteCache)srv0.createCache(ccfg); + + for (int k = 0; k < PARTS * 2; k++) { + assertNull(cache.get((long)k)); + + int vals = k % 3 + 1; + + for (int v = 0; v < vals; v++) + cache.put((long)k, (long)v); + + assertEquals((long)(vals - 1), (Object)cache.get((long)k)); + } + + assertEquals(PARTS * 2, cache.query(new SqlQuery<>(Long.class, "true")).getAll().size()); + + srv0.destroyCache(cache.getName()); + } + } + + /** + * @throws Exception If failed. + */ + public void testChangeValueType1() throws Exception { + Ignite srv0 = startGrid(0); + + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class); + + IgniteCache<Object, Object> cache = srv0.createCache(ccfg); + + cache.put(1, new MvccTestSqlIndexValue(1)); + cache.put(1, new MvccTestSqlIndexValue(2)); + + checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2); + + cache.put(1, 1); + + assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size()); + + checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1); + + cache.put(1, 2); + + checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2); + } + + /** + * @throws Exception If failed. + */ + public void testChangeValueType2() throws Exception { + Ignite srv0 = startGrid(0); + + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class); + + IgniteCache<Object, Object> cache = srv0.createCache(ccfg); + + cache.put(1, new MvccTestSqlIndexValue(1)); + cache.put(1, new MvccTestSqlIndexValue(2)); + + checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2); + + cache.remove(1); + + assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size()); + + cache.put(1, 1); + + assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size()); + + checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1); + + cache.put(1, 2); + + checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2); + } + + /** + * @throws Exception If failed. + */ public void testCountTransactional_SingleNode() throws Exception { countTransactional(true); }
