http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index a869b21..4a98f6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridAtomicLong; +import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.lang.GridIterator; @@ -82,6 +83,12 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public Iterable<CacheDataStore> cacheDataStores(); /** + * @param part Partition. + * @return Data store. + */ + public CacheDataStore dataStore(GridDhtLocalPartition part); + + /** * @param p Partition ID. * @param store Data store. */ @@ -107,6 +114,15 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { public long expiredSize() throws IgniteCheckedException; /** + * @param key Key. + * @param part Partition. + * @param c Tree update closure. + * @throws IgniteCheckedException If failed. + */ + public void invoke(KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c) + throws IgniteCheckedException; + + /** * @param key Key. * @param val Value. * @param ver Version. @@ -253,6 +269,16 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { /** * */ + interface OffheapInvokeClosure extends IgniteTree.InvokeClosure<CacheDataRow> { + /** + * @return Old row. + */ + @Nullable public CacheDataRow oldRow(); + } + + /** + * + */ interface CacheDataStore { /** * @return Partition ID. @@ -297,6 +323,21 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { /** * @param key Key. + * @param val Value. + * @param ver Version. + * @param expireTime Expire time. + * @param oldRow Old row. + * @return New row. + * @throws IgniteCheckedException If failed. + */ + CacheDataRow createRow(KeyCacheObject key, + CacheObject val, + GridCacheVersion ver, + long expireTime, + @Nullable CacheDataRow oldRow) throws IgniteCheckedException; + + /** + * @param key Key. * @param part Partition. * @param val Value. * @param ver Version. @@ -313,6 +354,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { /** * @param key Key. + * @param c Closure. + * @throws IgniteCheckedException If failed. + */ + public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException; + + /** + * @param key Key. * @param partId Partition number. * @throws IgniteCheckedException If failed. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 5df99b6..b863edd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -207,7 +207,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @param part Partition. * @return Data store for given entry. */ - private CacheDataStore dataStore(GridDhtLocalPartition part) { + public CacheDataStore dataStore(GridDhtLocalPartition part) { if (cctx.isLocal()) return locCacheDataStore; else { @@ -329,6 +329,14 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ + @Override public void invoke(KeyCacheObject key, + GridDhtLocalPartition part, + OffheapInvokeClosure c) + throws IgniteCheckedException { + dataStore(part).invoke(key, c); + } + + /** {@inheritDoc} */ @Override public void update( KeyCacheObject key, CacheObject val, @@ -838,6 +846,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple protected Long initCntr = 0L; /** + * @param partId Partition number. * @param name Name. * @param rowStore Row store. * @param dataTree Data tree. @@ -900,6 +909,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (oldRow == null || indexingEnabled) return false; + if (oldRow.expireTime() != dataRow.expireTime()) + return false; + CacheObjectContext coCtx = cctx.cacheObjectContext(); int oldLen = oldRow.key().valueBytesLength(coCtx) + oldRow.value().valueBytesLength(coCtx); @@ -913,6 +925,71 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ + @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c) + throws IgniteCheckedException { + if (!busyLock.enterBusy()) + throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); + + try { + dataTree.invoke(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY, c); + + switch (c.operationType()) { + case PUT: { + assert c.newRow() != null : c; + + CacheDataRow oldRow = c.oldRow(); + + finishUpdate(c.newRow(), oldRow); + + break; + } + + case REMOVE: { + CacheDataRow oldRow = c.oldRow(); + + finishRemove(key, oldRow); + + break; + } + + case NOOP: + break; + + default: + assert false : c.operationType(); + } + } + finally { + busyLock.leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public CacheDataRow createRow(KeyCacheObject key, + CacheObject val, + GridCacheVersion ver, + long expireTime, + @Nullable CacheDataRow oldRow) throws IgniteCheckedException + { + DataRow dataRow = new DataRow(key, val, ver, partId, expireTime); + + if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) + dataRow.link(oldRow.link()); + else { + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + key.valueBytes(coCtx); + val.valueBytes(coCtx); + + rowStore.addRow(dataRow); + } + + assert dataRow.link() != 0 : dataRow; + + return dataRow; + } + + /** {@inheritDoc} */ @Override public void update(KeyCacheObject key, int p, CacheObject val, @@ -935,14 +1012,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple CacheDataRow old; - boolean rmvOld = true; - if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) { old = oldRow; dataRow.link(oldRow.link()); - - rmvOld = false; } else { rowStore.addRow(dataRow); @@ -956,43 +1029,68 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } else old = dataTree.put(dataRow); - - if (old == null) - storageSize.incrementAndGet(); } - if (indexingEnabled) { - GridCacheQueryManager qryMgr = cctx.queries(); + finishUpdate(dataRow, old); + } + finally { + busyLock.leaveBusy(); + } + } - assert qryMgr.enabled(); + /** + * @param newRow New row. + * @param oldRow Old row if available. + * @throws IgniteCheckedException If failed. + */ + private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + if (oldRow == null) + storageSize.incrementAndGet(); - if (old != null) - qryMgr.store(key, p, old.value(), old.version(), val, ver, expireTime, dataRow.link()); - else - qryMgr.store(key, p, null, null, val, ver, expireTime, dataRow.link()); - } + KeyCacheObject key = newRow.key(); + + long expireTime = newRow.expireTime(); - if (old != null) { - assert old.link() != 0 : old; + if (indexingEnabled) { + GridCacheQueryManager qryMgr = cctx.queries(); - if (pendingEntries != null && old.expireTime() != 0) - pendingEntries.removex(new PendingRow(old.expireTime(), old.link())); + assert qryMgr.enabled(); - if (rmvOld) - rowStore.removeRow(old.link()); + if (oldRow != null) { + qryMgr.store(key, + partId, + oldRow.value(), oldRow.version(), + newRow.value(), newRow.version(), + expireTime, + newRow.link()); + } + else { + qryMgr.store(key, + partId, + null, null, + newRow.value(), newRow.version(), + expireTime, + newRow.link()); } + } - if (pendingEntries != null && expireTime != 0) { - pendingEntries.putx(new PendingRow(expireTime, dataRow.link())); + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; - hasPendingEntries = true; - } + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link())); - updateIgfsMetrics(key, (old != null ? old.value() : null), val); + if (newRow.link() != oldRow.link()) + rowStore.removeRow(oldRow.link()); } - finally { - busyLock.leaveBusy(); + + if (pendingEntries != null && expireTime != 0) { + pendingEntries.putx(new PendingRow(expireTime, newRow.link())); + + hasPendingEntries = true; } + + updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), newRow.value()); } /** {@inheritDoc} */ @@ -1001,50 +1099,59 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - CacheDataRow dataRow = dataTree.remove(new SearchRow(key)); - - CacheObject val = null; - GridCacheVersion ver = null; + CacheDataRow oldRow = dataTree.remove(new SearchRow(key)); - if (dataRow != null) { - assert dataRow.link() != 0 : dataRow; + finishRemove(key, oldRow); + } + finally { + busyLock.leaveBusy(); + } + } - if (pendingEntries != null && dataRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(dataRow.expireTime(), dataRow.link())); + /** + * @param key Key. + * @param oldRow Removed row. + * @throws IgniteCheckedException If failed. + */ + private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + CacheObject val = null; + GridCacheVersion ver = null; - storageSize.decrementAndGet(); + if (oldRow != null) { + assert oldRow.link() != 0 : oldRow; - val = dataRow.value(); + if (pendingEntries != null && oldRow.expireTime() != 0) + pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link())); - ver = dataRow.version(); - } + storageSize.decrementAndGet(); - if (indexingEnabled) { - GridCacheQueryManager qryMgr = cctx.queries(); + val = oldRow.value(); - assert qryMgr.enabled(); + ver = oldRow.version(); + } - qryMgr.remove(key, partId, val, ver); - } + if (indexingEnabled) { + GridCacheQueryManager qryMgr = cctx.queries(); - if (dataRow != null) - rowStore.removeRow(dataRow.link()); + assert qryMgr.enabled(); - updateIgfsMetrics(key, (dataRow != null ? dataRow.value() : null), null); - } - finally { - busyLock.leaveBusy(); + qryMgr.remove(key, partId, val, ver); } + + if (oldRow != null) + rowStore.removeRow(oldRow.link()); + + updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), null); } /** {@inheritDoc} */ @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException { key.valueBytes(cctx.cacheObjectContext()); - CacheDataRow row = dataTree.findOne(new SearchRow(key), dataTree.noKeyC); + CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY); if (row != null) - ((CacheDataRowAdapter)row).key(key); + row.key(key); return row; } @@ -1261,17 +1368,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** */ private final GridCacheContext cctx; - /** */ - private final RowClosure<CacheSearchRow, CacheDataRow> noKeyC = new RowClosure<CacheSearchRow, CacheDataRow>() { - @Override public CacheDataRow row(BPlusIO<CacheSearchRow> io, long pageAddr, int idx) - throws IgniteCheckedException { - int hash = ((RowLinkIO)io).getHash(pageAddr, idx); - long link = ((RowLinkIO)io).getLink(pageAddr, idx); - - return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.NO_KEY); - } - }; - /** * @param name Tree name. * @param reuseList Reuse list. @@ -1320,12 +1416,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx) + @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags) throws IgniteCheckedException { int hash = ((RowLinkIO)io).getHash(pageAddr, idx); long link = ((RowLinkIO)io).getLink(pageAddr, idx); - return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.FULL); + CacheDataRowAdapter.RowData x = flags != null ? + (CacheDataRowAdapter.RowData)flags : + CacheDataRowAdapter.RowData.FULL; + + return rowStore.dataRow(hash, link, x); } /** @@ -1705,7 +1805,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx) + @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object ignore) throws IgniteCheckedException { return io.getLookupRow(this, pageAddr, idx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java index 75ab8e4..cc26b21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.database; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** @@ -48,4 +49,9 @@ public interface CacheDataRow extends CacheSearchRow { * @param link Link for this row. */ public void link(long link); + + /** + * @param key Key. + */ + public void key(KeyCacheObject key); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index 4bfdd99..5a62e75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -73,6 +73,19 @@ public class CacheDataRowAdapter implements CacheDataRow { } /** + * @param key Key. + * @param val Value. + * @param expireTime Expire time. + * @param ver Version. + */ + public CacheDataRowAdapter(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime) { + this.key = key; + this.val = val; + this.ver = ver; + this.expireTime = expireTime; + } + + /** * Read row from data pages. * * @param cctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java index 47c3254..ca4ad05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java @@ -213,7 +213,7 @@ public class MetadataStorage implements MetaStore { /** {@inheritDoc} */ @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr, - final int idx) throws IgniteCheckedException { + final int idx, Object ignore) throws IgniteCheckedException { return readRow(pageAddr, ((IndexIO)io).getOffset(pageAddr, idx)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java index aa61fbd..8827407 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java @@ -120,20 +120,6 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** */ private volatile TreeMetaData treeMeta; - /** - * - */ - public static interface RowClosure<L, R> { - /** - * @param io IO. - * @param pageAddr Page address. - * @param idx Index. - * @return Result. - * @throws IgniteCheckedException If failed. - */ - public R row(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException; - } - /** */ private final GridTreePrinter<Long> treePrinter = new GridTreePrinter<Long>() { /** */ @@ -224,22 +210,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements long res = doAskNeighbor(io, pageAddr, back); if (back) { - assert g.getClass() == Remove.class; - if (io.getForward(pageAddr) != g.backId) // See how g.backId is setup in removeDown for this check. return RETRY; - g.backId = res; + g.backId(res); } else { assert isBack == FALSE.ordinal() : isBack; - g.fwdId = res; + g.fwdId(res); } return FOUND; } - }; + } /** */ private final GetPageHandler<Get> search = new Search(); @@ -257,7 +241,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements boolean needBackIfRouting = g.backId != 0; - g.backId = 0; // Usually we'll go left down and don't need it. + g.backId(0L); // Usually we'll go left down and don't need it. int cnt = io.getCount(pageAddr); int idx = findInsertionPoint(io, pageAddr, 0, cnt, g.row, g.shift); @@ -282,13 +266,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements assert !io.isLeaf() : io; // If idx == cnt then we go right down, else left down: getLeft(cnt) == getRight(cnt - 1). - g.pageId = inner(io).getLeft(pageAddr, idx); + g.pageId(inner(io).getLeft(pageAddr, idx)); // If we see the tree in consistent state, then our right down page must be forward for our left down page, // we need to setup fwdId and/or backId to be able to check this invariant on lower level. if (idx < cnt) { // Go left down here. - g.fwdId = inner(io).getRight(pageAddr, idx); + g.fwdId(inner(io).getRight(pageAddr, idx)); } else { // Go right down here or it is an empty branch. @@ -301,7 +285,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements // Setup fwdId. if (fwdId == 0) - g.fwdId = 0; + g.fwdId(0L); else { // We can do askNeighbor on forward page here because we always take locks in forward direction. Result res = askNeighbor(fwdId, g, false); @@ -312,7 +296,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements // Setup backId. if (cnt != 0) // It is not a routing page and we are going to the right, can get backId here. - g.backId = inner(io).getLeft(pageAddr, cnt - 1); + g.backId(inner(io).getLeft(pageAddr, cnt - 1)); else if (needBackIfRouting) { // Can't get backId here because of possible deadlock and it is only needed for remove operation. return GO_DOWN_X; @@ -321,7 +305,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return GO_DOWN; } - }; + } /** */ private final GetPageHandler<Put> replace = new Replace(); @@ -331,6 +315,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private class Replace extends GetPageHandler<Put> { /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public Result run0(Page page, long pageAddr, BPlusIO<L> io, Put p, int lvl) throws IgniteCheckedException { // Check the triangle invariant. @@ -351,15 +336,26 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements // Detach the old row if we are on a leaf page. if (lvl == 0) { - assert p.oldRow == null; + assert p.oldRow == null; // The old row must be set only once. + + // Inner replace state must be consistent by the end of the operation. + assert p.needReplaceInner == FALSE || p.needReplaceInner == DONE : p.needReplaceInner; + + // Need to replace inner key if now we are replacing the rightmost row and have a forward page. + if (canGetRowFromInner && idx + 1 == cnt && p.fwdId != 0L && p.needReplaceInner == FALSE) { + // Can happen only for invoke, otherwise inner key must be replaced on the way down. + assert p.invoke != null; + + // We need to restart the operation from root to perform inner replace. + // On the second pass we will not get here (will avoid infinite loop) because + // needReplaceInner will be DONE or our key will not be the rightmost anymore. + return RETRY_ROOT; + } // Get old row in leaf page to reduce contention at upper level. p.oldRow = p.needOld ? getRow(io, pageAddr, idx) : (T)Boolean.TRUE; p.finish(); - - // Inner replace state must be consistent by the end of the operation. - assert p.needReplaceInner == FALSE || p.needReplaceInner == DONE : p.needReplaceInner; } boolean needWal = needWalDeltaRecord(page); @@ -371,7 +367,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return FOUND; } - }; + } /** */ private final GetPageHandler<Put> insert = new Insert(); @@ -405,6 +401,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements p.btmLvl++; // Get high. p.row = moveUpRow; + if (p.invoke != null) + p.invoke.row = moveUpRow; + // Here forward page can't be concurrently removed because we keep write lock on tail which is the only // page who knows about the forward page, because it was just produced by split. p.rightId = io.getForward(pageAddr); @@ -417,7 +416,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return FOUND; } - }; + } /** */ private final GetPageHandler<Remove> rmvFromLeaf = new RemoveFromLeaf(); @@ -437,15 +436,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements int idx = findInsertionPoint(io, pageAddr, 0, cnt, r.row, 0); - if (idx < 0) { - if (!r.ceil) // We've found exact match on search but now it's gone. - return RETRY; - - idx = fix(idx); - - if (idx == cnt) // We can not remove ceiling row here. Bad luck. - return NOT_FOUND; - } + if (idx < 0) + return RETRY; // We've found exact match on search but now it's gone. assert idx >= 0 && idx < cnt: idx; @@ -495,7 +487,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return FOUND; } - }; + } /** */ private final GetPageHandler<Remove> lockBackAndRmvFromLeaf = new LockBackAndRmvFromLeaf(); @@ -520,7 +512,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return res; } - }; + } /** */ private final GetPageHandler<Remove> lockBackAndTail = new LockBackAndTail(); @@ -544,7 +536,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return res; } - }; + } /** */ private final GetPageHandler<Remove> lockTailForward = new LockTailForward(); @@ -560,7 +552,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return FOUND; } - }; + } /** */ private final GetPageHandler<Remove> lockTail = new LockTail(); @@ -590,7 +582,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return FOUND; } - }; + } /** */ private final PageHandler<Void, Bool> cutRoot = new CutRoot(); @@ -620,7 +612,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return TRUE; } - }; + } /** */ private final PageHandler<Long, Bool> addRoot = new AddRoot(); @@ -651,7 +643,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return TRUE; } - }; + } /** */ private final PageHandler<Long, Bool> initRoot = new InitRoot(); @@ -681,7 +673,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return TRUE; } - }; + } /** * @param name Tree name. @@ -947,15 +939,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param row Lookup row for exact match. - * @param c Found row closure. - * @return Found result. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * @return Found result or {@code null}. * @throws IgniteCheckedException If failed. */ - public final <R> R findOne(L row, RowClosure<L, R> c) throws IgniteCheckedException { + @SuppressWarnings("unchecked") + public final <R> R findOne(L row, Object x) throws IgniteCheckedException { checkDestroyed(); try { - GetOne g = new GetOne(row, c); + GetOne g = new GetOne(row, x); doFind(g); @@ -1036,7 +1029,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements case RETRY: checkInterrupted(); - continue; // The child page got splitted, need to reread our page. + continue; // The child page got split, need to reread our page. default: return res; @@ -1437,48 +1430,183 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Removed row. * @throws IgniteCheckedException If failed. */ - @SuppressWarnings("unused") - public final T removeCeil(L row) throws IgniteCheckedException { - return doRemove(row, true, true); + @Override public final T remove(L row) throws IgniteCheckedException { + return doRemove(row, true); } /** * @param row Lookup row. - * @return Removed row. * @throws IgniteCheckedException If failed. + * @return {@code True} if removed row. */ - @Override public final T remove(L row) throws IgniteCheckedException { - return doRemove(row, false, true); + public final boolean removex(L row) throws IgniteCheckedException { + Boolean res = (Boolean)doRemove(row, false); + + return res != null ? res : false; + } + + /** {@inheritDoc} */ + @Override public void invoke(L row, Object z, InvokeClosure<T> c) throws IgniteCheckedException { + checkDestroyed(); + + Invoke x = new Invoke(row, z, c); + + try { + for (;;) { + x.init(); + + Result res = invokeDown(x, x.rootId, 0L, 0L, x.rootLvl); + + switch (res) { + case RETRY: + case RETRY_ROOT: + checkInterrupted(); + + continue; + + default: + if (!x.isFinished()) { + res = x.tryFinish(); + + if (res == RETRY || res == RETRY_ROOT) { + checkInterrupted(); + + continue; + } + + assert x.isFinished(): res; + } + + return; + } + } + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Runtime failure on search row: " + row, e); + } + catch (RuntimeException e) { + throw new IgniteException("Runtime failure on search row: " + row, e); + } + catch (AssertionError e) { + throw new AssertionError("Assertion error on search row: " + row, e); + } + finally { + x.releaseAll(); + } } /** - * @param row Lookup row. + * @param x Invoke operation. + * @param pageId Page ID. + * @param backId Expected backward page ID if we are going to the right. + * @param fwdId Expected forward page ID. + * @param lvl Level. + * @return Result code. * @throws IgniteCheckedException If failed. - * @return {@code True} if removed row. */ - public final boolean removex(L row) throws IgniteCheckedException { - Boolean res = (Boolean)doRemove(row, false, false); + private Result invokeDown(final Invoke x, final long pageId, final long backId, final long fwdId, final int lvl) + throws IgniteCheckedException { + assert lvl >= 0 : lvl; - return res != null ? res : false; + if (x.isTail(pageId, lvl)) + return FOUND; // We've already locked this page, so return that we are ok. + + final Page page = page(pageId); + + try { + for (;;) { + // Init args. + x.pageId(pageId); + x.fwdId(fwdId); + x.backId(backId); + + Result res = readPage(page, this, search, x, lvl, RETRY); + + switch (res) { + case GO_DOWN_X: + assert backId != 0; + assert x.backId == 0; // We did not setup it yet. + + x.backId(pageId); // Dirty hack to setup a check inside of askNeighbor. + + // We need to get backId here for our child page, it must be the last child of our back. + res = askNeighbor(backId, x, true); + + if (res != FOUND) + return res; // Retry. + + assert x.backId != pageId; // It must be updated in askNeighbor. + + // Intentional fallthrough. + case GO_DOWN: + res = x.tryReplaceInner(page, pageId, fwdId, lvl); + + if (res != RETRY) + res = invokeDown(x, x.pageId, x.backId, x.fwdId, lvl - 1); + + if (res == RETRY_ROOT || x.isFinished()) + return res; + + if (res == RETRY) { + checkInterrupted(); + + continue; + } + + // Unfinished Put does insertion on the same level. + if (x.isPut()) + continue; + + assert x.isRemove(); // Guarded by isFinished. + + res = x.finishOrLockTail(page, pageId, backId, fwdId, lvl); + + return res; + + case NOT_FOUND: + if (lvl == 0) + x.invokeClosure(); + + return x.onNotFound(page, pageId, fwdId, lvl); + + case FOUND: + if (lvl == 0) + x.invokeClosure(); + + return x.onFound(page, pageId, backId, fwdId, lvl); + + default: + return res; + } + } + } + finally { + x.levelExit(); + + if (x.canRelease(page, lvl)) + page.close(); + } } + /** * @param row Lookup row. - * @param ceil If we can remove ceil row when we can not find exact. * @param needOld {@code True} if need return removed row. * @return Removed row. * @throws IgniteCheckedException If failed. */ - private T doRemove(L row, boolean ceil, boolean needOld) throws IgniteCheckedException { + private T doRemove(L row, boolean needOld) throws IgniteCheckedException { checkDestroyed(); - Remove r = new Remove(row, ceil, needOld); + Remove r = new Remove(row, needOld); try { for (;;) { r.init(); - switch (removeDown(r, r.rootId, 0L, 0L, r.rootLvl)) { + Result res = removeDown(r, r.rootId, 0L, 0L, r.rootLvl); + + switch (res) { case RETRY: case RETRY_ROOT: checkInterrupted(); @@ -1487,15 +1615,11 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements default: if (!r.isFinished()) { - Result res = r.finishTail(); + res = r.finishTail(); // If not found, then the tree grew beyond our call stack -> retry from the actual root. if (res == RETRY || res == NOT_FOUND) { - int root = getRootLevel(); - - boolean checkRes = r.checkTailLevel(root); - - assert checkRes : "tail=" + r.tail + ", root=" + root + ", res=" + res; + assert r.checkTailLevel(getRootLevel()) : "tail=" + r.tail + ", res=" + res; checkInterrupted(); @@ -1521,9 +1645,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements throw new AssertionError("Assertion error on search row: " + row, e); } finally { - r.releaseTail(); - - r.reuseFreePages(); + r.releaseAll(); } } @@ -1579,12 +1701,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements continue; } - if (res != RETRY_ROOT && !r.isFinished()) { - res = r.finishTail(); + if (res == RETRY_ROOT || r.isFinished()) + return res; - if (res == NOT_FOUND) - res = r.lockTail(pageId, page, backId, fwdId, lvl); - } + res = r.finishOrLockTail(page, pageId, backId, fwdId, lvl); return res; @@ -1592,31 +1712,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements // We are at the bottom. assert lvl == 0 : lvl; - if (!r.ceil) { - r.finish(); - - return res; - } + r.finish(); - // Intentional fallthrough for ceiling remove. + return res; case FOUND: - // We must be at the bottom here, just need to remove row from the current page. - assert lvl == 0 : lvl; - - res = r.removeFromLeaf(pageId, page, backId, fwdId); - - if (res == NOT_FOUND) { - assert r.ceil : "must be a retry if not a ceiling remove"; - - r.finish(); - } - else if (res == FOUND && r.tail == null) { - // Finish if we don't need to do any merges. - r.finish(); - } - - return res; + return r.tryRemoveFromLeaf(page, pageId, backId, fwdId, lvl); default: return res; @@ -1716,7 +1817,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * {@inheritDoc} */ @Override public final T put(T row) throws IgniteCheckedException { - return put(row, true); + return doPut(row, true); } /** @@ -1725,7 +1826,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return {@code True} if replaced existing row. */ public boolean putx(T row) throws IgniteCheckedException { - Boolean res = (Boolean)put(row, false); + Boolean res = (Boolean)doPut(row, false); return res != null ? res : false; } @@ -1736,7 +1837,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Old row. * @throws IgniteCheckedException If failed. */ - private T put(T row, boolean needOld) throws IgniteCheckedException { + private T doPut(T row, boolean needOld) throws IgniteCheckedException { checkDestroyed(); Put p = new Put(row, needOld); @@ -1858,7 +1959,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @return {@code True} if state was changed. */ - private final boolean markDestroyed() { + private boolean markDestroyed() { return destroyed.compareAndSet(false, true); } @@ -1997,31 +2098,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements assert p.pageId != pageId; assert p.fwdId != fwdId || fwdId == 0; - // Need to replace key in inner page. There is no race because we keep tail lock after split. - if (p.needReplaceInner == TRUE) { - p.needReplaceInner = FALSE; // Protect from retries. - - long oldFwdId = p.fwdId; - long oldPageId = p.pageId; - - // Set old args. - p.fwdId = fwdId; - p.pageId = pageId; - - res = writePage(pageMem, page, this, replace, p, lvl, RETRY); + res = p.tryReplaceInner(page, pageId, fwdId, lvl); - // Restore args. - p.pageId = oldPageId; - p.fwdId = oldFwdId; - - if (res != FOUND) - return res; // Need to retry. - - p.needReplaceInner = DONE; // We can have only single matching inner key. - } - - // Go down recursively. - res = putDown(p, p.pageId, p.fwdId, lvl - 1); + if (res != RETRY) // Go down recursively. + res = putDown(p, p.pageId, p.fwdId, lvl - 1); if (res == RETRY_ROOT || p.isFinished()) return res; @@ -2034,21 +2114,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements case FOUND: // Do replace. assert lvl == 0 : "This replace can happen only at the bottom level."; - // Init args. - p.pageId = pageId; - p.fwdId = fwdId; - - return writePage(pageMem, page, this, replace, p, lvl, RETRY); + return p.tryReplace(page, pageId, fwdId, lvl); case NOT_FOUND: // Do insert. assert lvl == p.btmLvl : "must insert at the bottom level"; assert p.needReplaceInner == FALSE : p.needReplaceInner + " " + lvl; - // Init args. - p.pageId = pageId; - p.fwdId = fwdId; - - return writePage(pageMem, page, this, insert, p, lvl, RETRY); + return p.tryInsert(page, pageId, fwdId, lvl); default: return res; @@ -2096,29 +2168,32 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private abstract class Get { /** */ - protected long rmvId; + long rmvId; /** Starting point root level. May be outdated. Must be modified only in {@link Get#init()}. */ - protected int rootLvl; + int rootLvl; /** Starting point root ID. May be outdated. Must be modified only in {@link Get#init()}. */ - protected long rootId; + long rootId; /** */ - protected L row; + L row; /** In/Out parameter: Page ID. */ - protected long pageId; + long pageId; /** In/Out parameter: expected forward page ID. */ - protected long fwdId; + long fwdId; /** In/Out parameter: in case of right turn this field will contain backward page ID for the child. */ - protected long backId; + long backId; /** */ int shift; + /** If this operation is a part of invoke. */ + Invoke invoke; + /** * @param row Row. */ @@ -2129,6 +2204,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** + * @param g Other operation to copy from. + * @return {@code this}. + */ + final Get copyFrom(Get g) { + rmvId = g.rmvId; + rootLvl = g.rootLvl; + pageId = g.pageId; + fwdId = g.fwdId; + backId = g.backId; + shift = g.shift; + + return this; + } + + /** * Initialize operation. * * @throws IgniteCheckedException If failed. @@ -2146,7 +2236,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @param rootLvl Root level. * @param rmvId Remove ID to be afraid of. */ - final void restartFromRoot(long rootId, int rootLvl, long rmvId) { + void restartFromRoot(long rootId, int rootLvl, long rmvId) { this.rootId = rootId; this.rootLvl = rootLvl; this.rmvId = rmvId; @@ -2188,6 +2278,34 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements boolean canRelease(Page page, int lvl) { return page != null; } + + /** + * @param backId Back page ID. + */ + void backId(long backId) { + this.backId = backId; + } + + /** + * @param pageId Page ID. + */ + void pageId(long pageId) { + this.pageId = pageId; + } + + /** + * @param fwdId Forward page ID. + */ + void fwdId(long fwdId) { + this.fwdId = fwdId; + } + + /** + * @return {@code true} If the operation is finished. + */ + boolean isFinished() { + throw new IllegalStateException(); + } } /** @@ -2195,25 +2313,26 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private final class GetOne extends Get { /** */ - private final RowClosure<L, ?> c; + Object x; /** * @param row Row. - * @param c Row closure. + * @param x Implementation specific argument. */ - private GetOne(L row, RowClosure<L, ?> c) { + private GetOne(L row, Object x) { super(row); - this.c = c; + this.x = x; } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException { // Check if we are on an inner page and can't get row from it. if (lvl != 0 && !canGetRowFromInner) return false; - row = c != null ? (L)c.row(io, pageAddr, idx) : getRow(io, pageAddr, idx); + row = getRow(io, pageAddr, idx, x); return true; } @@ -2224,7 +2343,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private final class GetCursor extends Get { /** */ - private ForwardCursor cursor; + ForwardCursor cursor; /** * @param lower Lower bound. @@ -2261,31 +2380,31 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements */ private final class Put extends Get { /** Right child page ID for split row. */ - private long rightId; + long rightId; /** Replaced row if any. */ - private T oldRow; + T oldRow; /** * This page is kept locked after split until insert to the upper level will not be finished. * It is needed because split row will be "in flight" and if we'll release tail, remove on * split row may fail. */ - private Page tail; + Page tail; /** */ - private long tailPageAddr; + long tailPageAddr; /** * Bottom level for insertion (insert can't go deeper). Will be incremented on split on each level. */ - private short btmLvl; + short btmLvl; /** */ Bool needReplaceInner = FALSE; /** */ - private final boolean needOld; + final boolean needOld; /** * @param row Row. @@ -2302,6 +2421,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements if (lvl == 0) // Leaf: need to stop. return true; + assert btmLvl == 0; // It can not be insert. + // If we can get full row from the inner page, we have to replace it with the new one. On the way down // we can not miss inner key even in presence of concurrent operations because of `triangle` invariant + // concurrent inner replace handling by retrying from root. @@ -2348,10 +2469,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements tail(null, 0L); } - /** - * @return {@code true} If finished. - */ - private boolean isFinished() { + /** {@inheritDoc} */ + @Override boolean isFinished() { return row == null; } @@ -2505,45 +2624,404 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } } } - } - /** - * Remove operation. - */ - private final class Remove extends Get implements ReuseBag { - /** */ - private boolean ceil; + /** + * @param page Page. + * @param pageId Page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private Result tryReplaceInner(Page page, long pageId, long fwdId, int lvl) + throws IgniteCheckedException { + // Need to replace key in inner page. There is no race because we keep tail lock after split. + if (needReplaceInner == TRUE) { + needReplaceInner = FALSE; // Protect from retries. - /** We may need to lock part of the tree branch from the bottom to up for multiple levels. */ - private Tail<L> tail; + long oldFwdId = this.fwdId; + long oldPageId = this.pageId; - /** */ - Bool needReplaceInner = FALSE; + // Set old args. + this.fwdId = fwdId; + this.pageId = pageId; - /** */ - Bool needMergeEmptyBranch = FALSE; + Result res = writePage(pageMem, page, BPlusTree.this, replace, this, lvl, RETRY); - /** Removed row. */ - private T rmvd; + // Restore args. + this.pageId = oldPageId; + this.fwdId = oldFwdId; - /** Current page. */ - private Page page; + if (res == RETRY) + return RETRY; + + needReplaceInner = DONE; // We can have only a single matching inner key. + + return FOUND; + } + + return NOT_FOUND; + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private Result tryInsert(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException { + // Init args. + this.pageId = pageId; + this.fwdId = fwdId; + + return writePage(pageMem, page, BPlusTree.this, insert, this, lvl, RETRY); + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + public Result tryReplace(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException { + // Init args. + this.pageId = pageId; + this.fwdId = fwdId; + + return writePage(pageMem, page, BPlusTree.this, replace, this, lvl, RETRY); + } + } + + /** + * Invoke operation. + */ + private final class Invoke extends Get { + /** */ + Object x; + + /** */ + InvokeClosure<T> clo; + + /** */ + Bool closureInvoked = FALSE; + + /** */ + T foundRow; + + /** */ + Get op; + + /** + * @param row Row. + * @param x Implementation specific argument. + * @param clo Closure. + */ + private Invoke(L row, Object x, final InvokeClosure<T> clo) { + super(row); + + assert clo != null; + + this.clo = clo; + this.x = x; + } + + /** {@inheritDoc} */ + @Override void pageId(long pageId) { + this.pageId = pageId; + + if (op != null) + op.pageId = pageId; + } + + /** {@inheritDoc} */ + @Override void fwdId(long fwdId) { + this.fwdId = fwdId; + + if (op != null) + op.fwdId = fwdId; + } + + /** {@inheritDoc} */ + @Override void backId(long backId) { + this.backId = backId; + + if (op != null) + op.backId = backId; + } + + /** {@inheritDoc} */ + @Override void restartFromRoot(long rootId, int rootLvl, long rmvId) { + super.restartFromRoot(rootId, rootLvl, rmvId); + + if (op != null) + op.restartFromRoot(rootId, rootLvl, rmvId); + } + + /** {@inheritDoc} */ + @Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException { + // If the operation is initialized, then the closure has been called already. + if (op != null) + return op.found(io, pageAddr, idx, lvl); + + if (lvl == 0) { + if (closureInvoked == FALSE) { + closureInvoked = READY; + + foundRow = getRow(io, pageAddr, idx, x); + } + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException { + // If the operation is initialized, then the closure has been called already. + if (op != null) + return op.notFound(io, pageAddr, idx, lvl); + + if (lvl == 0) { + if (closureInvoked == FALSE) + closureInvoked = READY; + + return true; + } + + return false; + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void invokeClosure() throws IgniteCheckedException { + if (closureInvoked != READY) + return; + + closureInvoked = DONE; + + clo.call(foundRow); + + switch (clo.operationType()) { + case PUT: + T newRow = clo.newRow(); + + assert newRow != null; + + op = new Put(newRow, false); + + break; + + case REMOVE: + assert foundRow != null; + + op = new Remove(row, false); + + break; + + case NOOP: + return; + + default: + throw new IllegalStateException(); + } + + op.copyFrom(this); + + op.invoke = this; + } + + /** {@inheritDoc} */ + @Override boolean canRelease(Page page, int lvl) { + if (page == null) + return false; + + if (op == null) + return true; + + return op.canRelease(page, lvl); + } + + /** + * @return {@code true} If it is a {@link Put} operation internally. + */ + private boolean isPut() { + return op != null && op.getClass() == Put.class; + } + + /** + * @return {@code true} If it is a {@link Remove} operation internally. + */ + private boolean isRemove() { + return op != null && op.getClass() == Remove.class; + } + + /** + * @param pageId Page ID. + * @param lvl Level. + * @return {@code true} If it is a {@link Remove} and the page is in tail. + */ + private boolean isTail(long pageId, int lvl) { + return isRemove() && ((Remove)op).isTail(pageId, lvl); + } + + /** + */ + private void levelExit() { + if (isRemove()) + ((Remove)op).page = null; + } + + /** + * Release all the resources by the end of operation. + */ + private void releaseAll() throws IgniteCheckedException { + if (isRemove()) + ((Remove)op).releaseAll(); + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private Result onNotFound(Page page, long pageId, long fwdId, int lvl) + throws IgniteCheckedException { + if (op == null) + return NOT_FOUND; + + if (isRemove()) { + assert lvl == 0; + + ((Remove)op).finish(); + + return NOT_FOUND; + } + + return ((Put)op).tryInsert(page, pageId, fwdId, lvl); + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param backId Back page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private Result onFound(Page page, long pageId, long backId, long fwdId, int lvl) + throws IgniteCheckedException { + if (op == null) + return FOUND; + + if (isRemove()) + return ((Remove)op).tryRemoveFromLeaf(page, pageId, backId, fwdId, lvl); + + return ((Put)op).tryReplace(page, pageId, fwdId, lvl); + } + + /** + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private Result tryFinish() throws IgniteCheckedException { + assert op != null; // Must be guarded by isFinished. + + if (isPut()) + return RETRY; + + Result res = ((Remove)op).finishTail(); + + if (res == NOT_FOUND) + res = RETRY; + + assert res == FOUND || res == RETRY: res; + + return res; + } + + /** {@inheritDoc} */ + @Override boolean isFinished() { + if (closureInvoked != DONE) + return false; + + if (op == null) + return true; + + return op.isFinished(); + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + Result tryReplaceInner(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException { + if (!isPut()) + return NOT_FOUND; + + return ((Put)op).tryReplaceInner(page, pageId, fwdId, lvl); + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param backId Back page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + public Result finishOrLockTail(Page page, long pageId, long backId, long fwdId, int lvl) + throws IgniteCheckedException { + return ((Remove)op).finishOrLockTail(page, pageId, backId, fwdId, lvl); + } + } + + /** + * Remove operation. + */ + private final class Remove extends Get implements ReuseBag { + /** We may need to lock part of the tree branch from the bottom to up for multiple levels. */ + Tail<L> tail; + + /** */ + Bool needReplaceInner = FALSE; + + /** */ + Bool needMergeEmptyBranch = FALSE; + + /** Removed row. */ + T rmvd; + + /** Current page. */ + Page page; /** */ - private Object freePages; + Object freePages; /** */ - private final boolean needOld; + final boolean needOld; /** * @param row Row. - * @param ceil If we can remove ceil row when we can not find exact. * @param needOld {@code True} If need return old value. */ - private Remove(L row, boolean ceil, boolean needOld) { + private Remove(L row, boolean needOld) { super(row); - this.ceil = ceil; this.needOld = needOld; } @@ -2551,12 +3029,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements @SuppressWarnings("unchecked") @Override public long pollFreePage() { if (freePages == null) - return 0; + return 0L; if (freePages.getClass() == GridLongList.class) { GridLongList list = ((GridLongList)freePages); - return list.isEmpty() ? 0 : list.remove(); + return list.isEmpty() ? 0L : list.remove(); } long res = (long)freePages; @@ -2569,7 +3047,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void addFreePage(long pageId) { - assert pageId != 0; + assert pageId != 0L; if (freePages == null) freePages = pageId; @@ -2957,6 +3435,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @param idx Index to remove. * @throws IgniteCheckedException If failed. */ + @SuppressWarnings("unchecked") private void removeDataRowFromLeaf(Page page, BPlusIO<L> io, long pageAddr, int cnt, int idx) throws IgniteCheckedException { assert idx >= 0 && idx < cnt: idx; @@ -3280,10 +3759,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements return true; } - /** - * @return {@code true} If finished. - */ - private boolean isFinished() { + /** {@inheritDoc} */ + @Override boolean isFinished() { return row == null; } @@ -3438,9 +3915,58 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @param rootLvl Actual root level. * @return {@code true} If tail level is correct. */ - public boolean checkTailLevel(int rootLvl) { + private boolean checkTailLevel(int rootLvl) { return tail == null || tail.lvl < rootLvl; } + + /** + * @throws IgniteCheckedException If failed. + */ + private void releaseAll() throws IgniteCheckedException { + releaseTail(); + reuseFreePages(); + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param backId Back page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private Result finishOrLockTail(Page page, long pageId, long backId, long fwdId, int lvl) + throws IgniteCheckedException { + Result res = finishTail(); + + if (res == NOT_FOUND) + res = lockTail(pageId, page, backId, fwdId, lvl); + + return res; + } + + /** + * @param page Page. + * @param pageId Page ID. + * @param backId Back page ID. + * @param fwdId Forward ID. + * @param lvl Level. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + private Result tryRemoveFromLeaf(Page page, long pageId, long backId, long fwdId, int lvl) + throws IgniteCheckedException { + // We must be at the bottom here, just need to remove row from the current page. + assert lvl == 0 : lvl; + + Result res = removeFromLeaf(pageId, page, backId, fwdId); + + if (res == FOUND && tail == null) // Finish if we don't need to do any merges. + finish(); + + return res; + } } /** @@ -3619,7 +4145,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements protected abstract int compare(BPlusIO<L> io, long pageAddr, int idx, L row) throws IgniteCheckedException; /** - * Get the full detached row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}. + * Get a full detached data row. * * @param io IO. * @param pageAddr Page address. @@ -3627,7 +4153,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Full detached data row. * @throws IgniteCheckedException If failed. */ - protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException; + protected final T getRow(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException { + return getRow(io, pageAddr, idx, null); + } + + /** + * Get data row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}. + * + * @param io IO. + * @param pageAddr Page address. + * @param idx Index. + * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row. + * @return Data row. + * @throws IgniteCheckedException If failed. + */ + protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException; /** * Forward cursor. http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index fc78f69..0c71731 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -97,6 +97,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ + @Override protected long nextPartCounter() { + return locPart.nextUpdateCounter(); + } + + /** {@inheritDoc} */ @Override public int memorySize() throws IgniteCheckedException { int rdrsOverhead; http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java index 8dcd205..7eae0d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.util.lang.GridCursor; +import org.jetbrains.annotations.Nullable; /** * Interface for ignite internal tree. @@ -34,6 +35,14 @@ public interface IgniteTree<L, T> { public T put(T val) throws IgniteCheckedException; /** + * @param key Key. + * @param x Implementation specific argument, {@code null} always means that we need a full detached data row. + * @param c Closure. + * @throws IgniteCheckedException If failed. + */ + public void invoke(L key, Object x, InvokeClosure<T> c) throws IgniteCheckedException; + + /** * Returns the value to which the specified key is mapped, or {@code null} if this tree contains no mapping for the * key. * @@ -70,4 +79,42 @@ public interface IgniteTree<L, T> { * @throws IgniteCheckedException If failed. */ public long size() throws IgniteCheckedException; + + /** + * + */ + interface InvokeClosure<T> { + /** + * + * @param row Old row or {@code null} if old row not found. + * @throws IgniteCheckedException If failed. + */ + void call(@Nullable T row) throws IgniteCheckedException; + + /** + * @return New row for {@link OperationType#PUT} operation. + */ + T newRow(); + + /** + * @return Operation type for this closure or {@code null} if it is unknown. + * After method {@link #call(Object)} has been called, operation type must + * be know and this method can not return {@code null}. + */ + OperationType operationType(); + } + + /** + * + */ + enum OperationType { + /** */ + NOOP, + + /** */ + REMOVE, + + /** */ + PUT + } }
