Repository: ignite Updated Branches: refs/heads/ignite-3477 c875221a4 -> 8ef417c14
ignite-3477 Optimizations: - in-place update if value size does not change - do not need call part.onUpdateReceived if persistence is disabled - do not create unused key instance for cache.get operation Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8ef417c1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8ef417c1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8ef417c1 Branch: refs/heads/ignite-3477 Commit: 8ef417c14e4ef966415478cd3cad4ae7f638f668 Parents: c875221 Author: sboikov <[email protected]> Authored: Tue Jan 24 15:29:20 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Jan 24 15:29:20 2017 +0300 ---------------------------------------------------------------------- .../internal/pagemem/wal/record/WALRecord.java | 5 +- .../wal/record/delta/DataPageUpdateRecord.java | 79 ++++++++++++ .../processors/cache/CacheLazyEntry.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 64 ++++++---- .../cache/IgniteCacheOffheapManager.java | 8 +- .../cache/IgniteCacheOffheapManagerImpl.java | 124 +++++++++++++++---- .../cache/database/CacheDataRowAdapter.java | 70 +++++++---- .../processors/cache/database/RowStore.java | 12 ++ .../cache/database/freelist/FreeList.java | 8 ++ .../cache/database/freelist/FreeListImpl.java | 69 +++++++++-- .../cache/database/tree/BPlusTree.java | 44 +++++-- .../cache/database/tree/io/DataPageIO.java | 57 +++++++-- .../distributed/dht/GridDhtCacheEntry.java | 4 +- .../distributed/dht/GridDhtLocalPartition.java | 14 +-- .../colocated/GridDhtDetachedCacheEntry.java | 9 +- .../distributed/near/GridNearCacheEntry.java | 5 +- .../query/h2/database/H2RowFactory.java | 2 +- 17 files changed, 456 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 959ad7d..f761f68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -154,7 +154,10 @@ public abstract class WALRecord { PAGE_LIST_META_RESET_COUNT_RECORD, /** Switch segment record. */ - SWITCH_SEGMENT_RECORD + SWITCH_SEGMENT_RECORD, + + /** */ + DATA_PAGE_UPDATE_RECORD ; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java new file mode 100644 index 0000000..65b7172 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record.delta; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; + +/** + * Update existing record in data page. + */ +public class DataPageUpdateRecord extends PageDeltaRecord { + /** */ + private int itemId; + + /** */ + private byte[] payload; + + /** + * @param cacheId Cache ID. + * @param pageId Page ID. + * @param itemId Item ID. + * @param payload Record data. + */ + public DataPageUpdateRecord( + int cacheId, + long pageId, + int itemId, + byte[] payload + ) { + super(cacheId, pageId); + + this.payload = payload; + this.itemId = itemId; + } + + /** + * @return Item ID. + */ + public int itemId() { + return itemId; + } + + /** + * @return Insert record payload. + */ + public byte[] payload() { + return payload; + } + + /** {@inheritDoc} */ + @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { + assert payload != null; + + DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + + io.updateRow(pageAddr, itemId, pageMem.pageSize(), payload, null, 0); + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.DATA_PAGE_UPDATE_RECORD; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java index be6019e..a4bb6bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java @@ -182,7 +182,7 @@ public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> { * * @param updateCntr Update counter. */ - public void updateCounter(Long updateCntr) { + public void updateCounter(long updateCntr) { this.updateCntr = updateCntr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ee39ed9..6dc1d04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -349,7 +349,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** {@inheritDoc} */ @Nullable @Override public final CacheObject unswap(boolean needVal) throws IgniteCheckedException, GridCacheEntryRemovedException { - return unswap(needVal, true); + CacheDataRow row = unswap(needVal, true); + + return row != null ? row.value() : null; } /** @@ -361,7 +363,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @throws IgniteCheckedException If failed. * @throws GridCacheEntryRemovedException If entry was removed. */ - @Nullable protected CacheObject unswap(boolean needVal, boolean checkExpire) + @Nullable protected CacheDataRow unswap(boolean needVal, boolean checkExpire) throws IgniteCheckedException, GridCacheEntryRemovedException { boolean obsolete = false; boolean deferred = false; @@ -385,7 +387,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme : 0; if (delta >= 0) - return val; + return read; else { if (onExpired(this.val, null)) { if (cctx.deferredDelete()) { @@ -542,8 +544,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = this.val; if (val == null) { - if (isStartVersion()) - val = unswap(true, false); + if (isStartVersion()) { + unswap(true, false); + + val = this.val; + } } if (val != null) { @@ -668,7 +673,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme long expTime = CU.toExpireTime(ttl); // Update indexes before actual write to entry. - storeValue(ret, expTime, nextVer); + storeValue(ret, expTime, nextVer, null); update(ret, expTime, ttl, nextVer, true); @@ -751,7 +756,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Update indexes. if (ret != null) { - storeValue(ret, expTime, nextVer); + storeValue(ret, expTime, nextVer, null); if (cctx.deferredDelete() && !isInternal() && !detached() && deletedUnlocked()) deletedUnlocked(false); @@ -906,7 +911,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert val != null; - storeValue(val, expireTime, newVer); + storeValue(val, expireTime, newVer, null); if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); @@ -1025,7 +1030,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheLazyEntry entry0 = null; - Long updateCntr0; + long updateCntr0; boolean deferred; @@ -1248,9 +1253,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme checkObsolete(); + CacheDataRow oldRow = null; + // Load and remove from swap if it is new. if (isNew()) - unswap(retval, false); + oldRow = unswap(retval, false); old = val; @@ -1284,7 +1291,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme old = cctx.kernalContext().cacheObjects().prepareForCache(old, cctx); if (old != null) - storeValue(old, expireTime, ver); + storeValue(old, expireTime, ver, oldRow); else removeValue(); @@ -1429,7 +1436,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().put(null, key, updated, ver); - storeValue(updated, expireTime, ver); + storeValue(updated, expireTime, ver, oldRow); assert ttl != CU.TTL_ZERO; @@ -1593,9 +1600,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme checkObsolete(); + CacheDataRow oldRow = null; + // Load and remove from swap if it is new. if (isStartVersion()) - unswap(retval, false); + oldRow = unswap(retval, false); // Prepare old value. oldVal = val; @@ -1632,7 +1641,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (oldVal != null) - storeValue(oldVal, initExpireTime, ver); + storeValue(oldVal, initExpireTime, ver, oldRow); // else nothing to do, real old value was null. update(oldVal, initExpireTime, initTtl, ver, true); @@ -2061,7 +2070,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme logUpdate(op, updated, newVer, newExpireTime, updateCntr0); - storeValue(updated, newExpireTime, newVer); + storeValue(updated, newExpireTime, newVer, oldRow); update(updated, newExpireTime, newTtl, newVer, true); @@ -2715,7 +2724,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ttlAndExpireTimeExtras(ttl, expireTime); - storeValue(val, expireTime, ver); + storeValue(val, expireTime, ver, null); } /** @@ -2944,7 +2953,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); if (val != null) - storeValue(val, expTime, ver); + storeValue(val, expTime, ver, null); update(val, expTime, ttl, ver, true); @@ -3013,7 +3022,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param cntr Updated partition counter. */ - protected void onUpdateFinished(Long cntr) { + protected void onUpdateFinished(long cntr) { // No-op. } @@ -3068,7 +3077,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme throws IgniteCheckedException, GridCacheEntryRemovedException { boolean isNew = isStartVersion(); - CacheObject val = isNew ? unswap(true, false) : this.val; + if (isNew) + unswap(true, false); + + CacheObject val = this.val; return new GridCacheLazyPlainVersionedEntry<>(cctx, key, @@ -3117,7 +3129,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); if (val != null) { - storeValue(val, expTime, newVer); + storeValue(val, expTime, newVer, null); if (deletedUnlocked()) deletedUnlocked(false); @@ -3544,15 +3556,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param val Value. * @param expireTime Expire time. * @param ver New entry version. + * @param oldRow Old row if available. * @throws IgniteCheckedException If update failed. */ protected void storeValue(@Nullable CacheObject val, long expireTime, - GridCacheVersion ver) throws IgniteCheckedException { + GridCacheVersion ver, + @Nullable CacheDataRow oldRow) throws IgniteCheckedException { assert Thread.holdsLock(this); assert val != null : "null values in update for key: " + key; - cctx.offheap().update(key, val, ver, expireTime, partition(), localPartition()); + cctx.offheap().update(key, + val, + ver, + expireTime, + partition(), + localPartition(), + oldRow); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index f01f0ff..a869b21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -112,6 +112,7 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { * @param ver Version. * @param expireTime Expire time. * @param partId Partition number. + * @param oldRow Old row if available. * @param part Partition. * @throws IgniteCheckedException If failed. */ @@ -121,7 +122,8 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { GridCacheVersion ver, long expireTime, int partId, - GridDhtLocalPartition part + GridDhtLocalPartition part, + @Nullable CacheDataRow oldRow ) throws IgniteCheckedException; /** @@ -299,13 +301,15 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { * @param val Value. * @param ver Version. * @param expireTime Expire time. + * @param oldRow Old row if available. * @throws IgniteCheckedException If failed. */ void update(KeyCacheObject key, int part, CacheObject val, GridCacheVersion ver, - long expireTime) throws IgniteCheckedException; + long expireTime, + @Nullable CacheDataRow oldRow) throws IgniteCheckedException; /** * @param key Key. http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index aab5a75..5b788fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -106,6 +106,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** */ private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + /** */ + private int updateValSizeThreshold; + /** {@inheritDoc} */ @Override public GridAtomicLong globalRemoveId() { return globalRmvId; @@ -117,6 +120,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple indexingEnabled = GridQueryProcessor.isEnabled(cctx.config()); + updateValSizeThreshold = cctx.kernalContext().config().getMemoryConfiguration().getPageSize() / 2; + if (cctx.affinityNode()) { if (cctx.kernalContext().clientNode()) { assert cctx.isLocal() : cctx.name(); @@ -327,11 +332,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple GridCacheVersion ver, long expireTime, int partId, - GridDhtLocalPartition part + GridDhtLocalPartition part, + @Nullable CacheDataRow oldRow ) throws IgniteCheckedException { assert expireTime >= 0; - dataStore(part).update(key, partId, val, ver, expireTime); + dataStore(part).update(key, partId, val, ver, expireTime, oldRow); } /** {@inheritDoc} */ @@ -880,30 +886,77 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple return name; } + /** + * @param oldRow Old row. + * @param dataRow New row. + * @return {@code True} if it is possible to update old row data. + * @throws IgniteCheckedException If failed. + */ + private boolean canUpdateOldRow(@Nullable CacheDataRow oldRow, DataRow dataRow) + throws IgniteCheckedException { + if (oldRow == null || indexingEnabled) + return false; + + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + int oldLen = oldRow.key().valueBytesLength(coCtx) + oldRow.value().valueBytesLength(coCtx); + + if (oldLen > updateValSizeThreshold) + return false; + + int newLen = dataRow.key().valueBytesLength(coCtx) + dataRow.value().valueBytesLength(coCtx); + + return oldLen == newLen; + } + /** {@inheritDoc} */ @Override public void update(KeyCacheObject key, int p, CacheObject val, GridCacheVersion ver, - long expireTime) throws IgniteCheckedException { - DataRow dataRow = new DataRow(key, val, ver, p, expireTime); - - // Make sure value bytes initialized. - key.valueBytes(cctx.cacheObjectContext()); - val.valueBytes(cctx.cacheObjectContext()); + long expireTime, + @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + assert oldRow == null || oldRow.link() != 0L : oldRow; if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - rowStore.addRow(dataRow); + DataRow dataRow = new DataRow(key, val, ver, p, expireTime); + + CacheObjectContext coCtx = cctx.cacheObjectContext(); + + // Make sure value bytes initialized. + key.valueBytes(coCtx); + val.valueBytes(coCtx); + + CacheDataRow old; - assert dataRow.link() != 0 : dataRow; + boolean rmvOld = true; - CacheDataRow old = dataTree.put(dataRow); + if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) { + old = oldRow; - if (old == null) - storageSize.incrementAndGet(); + dataRow.link(oldRow.link()); + + rmvOld = false; + } + else { + rowStore.addRow(dataRow); + + assert dataRow.link() != 0 : dataRow; + + if (oldRow != null) { + old = oldRow; + + dataTree.putx(dataRow); + } + else + old = dataTree.put(dataRow); + + if (old == null) + storageSize.incrementAndGet(); + } if (indexingEnabled) { GridCacheQueryManager qryMgr = cctx.queries(); @@ -922,7 +975,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (pendingEntries != null && old.expireTime() != 0) pendingEntries.removex(new PendingRow(old.expireTime(), old.link())); - rowStore.removeRow(old.link()); + if (rmvOld) + rowStore.removeRow(old.link()); } if (pendingEntries != null && expireTime != 0) @@ -978,9 +1032,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public CacheDataRow find(KeyCacheObject key) - throws IgniteCheckedException { - return dataTree.findOne(new SearchRow(key)); + @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException { + key.valueBytes(cctx.cacheObjectContext()); + + CacheDataRow row = dataTree.findOne(new SearchRow(key), dataTree.noKeyC); + + if (row != null) + ((CacheDataRowAdapter)row).key(key); + + return row; } /** {@inheritDoc} */ @@ -1133,9 +1193,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** * @param hash Hash code. * @param link Link. - * @param keyOnly If {@code true} initializes only key. + * @param rowData Required row data. */ - DataRow(int hash, long link, boolean keyOnly) { + DataRow(int hash, long link, CacheDataRowAdapter.RowData rowData) { super(link); this.hash = hash; @@ -1144,7 +1204,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple try { // We can not init data row lazily because underlying buffer can be concurrently cleared. - initFromLink(cctx, keyOnly); + initFromLink(cctx, rowData); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -1195,6 +1255,17 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** */ private final GridCacheContext cctx; + /** */ + private final RowClosure<CacheSearchRow, CacheDataRow> noKeyC = new RowClosure<CacheSearchRow, CacheDataRow>() { + @Override public CacheDataRow row(BPlusIO<CacheSearchRow> io, long pageAddr, int idx) + throws IgniteCheckedException { + int hash = ((RowLinkIO)io).getHash(pageAddr, idx); + long link = ((RowLinkIO)io).getLink(pageAddr, idx); + + return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.NO_KEY); + } + }; + /** * @param name Tree name. * @param reuseList Reuse list. @@ -1248,7 +1319,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple int hash = ((RowLinkIO)io).getHash(pageAddr, idx); long link = ((RowLinkIO)io).getLink(pageAddr, idx); - return rowStore.dataRow(hash, link); + return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.FULL); } /** @@ -1304,7 +1375,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple // TODO GG-11768. CacheDataRowAdapter other = new CacheDataRowAdapter(link); - other.initFromLink(cctx, true); + other.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY); byte[] bytes1 = other.key().valueBytes(cctx.cacheObjectContext()); byte[] bytes2 = key.valueBytes(cctx.cacheObjectContext()); @@ -1344,16 +1415,17 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @return Search row. */ private CacheSearchRow keySearchRow(int hash, long link) { - return new DataRow(hash, link, true); + return new DataRow(hash, link, CacheDataRowAdapter.RowData.KEY_ONLY); } /** * @param hash Hash code. * @param link Link. + * @param rowData Required row data. * @return Data row. */ - private CacheDataRow dataRow(int hash, long link) { - return new DataRow(hash, link, false); + private CacheDataRow dataRow(int hash, long link, CacheDataRowAdapter.RowData rowData) { + return new DataRow(hash, link, rowData); } } @@ -1552,7 +1624,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple CacheDataRowAdapter rowData = new CacheDataRowAdapter(link); - rowData.initFromLink(cctx, true); + rowData.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY); row.key = rowData.key(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index 5288aad..4bfdd99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -76,10 +76,10 @@ public class CacheDataRowAdapter implements CacheDataRow { * Read row from data pages. * * @param cctx Cache context. - * @param keyOnly {@code true} If need to read only key object. + * @param rowData Required row data. * @throws IgniteCheckedException If failed. */ - public final void initFromLink(GridCacheContext<?, ?> cctx, boolean keyOnly) throws IgniteCheckedException { + public final void initFromLink(GridCacheContext<?, ?> cctx, RowData rowData) throws IgniteCheckedException { assert cctx != null : "cctx"; assert link != 0 : "link"; assert key == null : "key"; @@ -110,7 +110,7 @@ public class CacheDataRowAdapter implements CacheDataRow { if (first) { if (nextLink == 0) { // Fast path for a single page row. - readFullRow(coctx, pageAddr + data.offset(), keyOnly); + readFullRow(coctx, pageAddr + data.offset(), rowData); return; } @@ -123,6 +123,8 @@ public class CacheDataRowAdapter implements CacheDataRow { buf.position(data.offset()); buf.limit(data.offset() + data.payloadSize()); + boolean keyOnly = rowData == RowData.KEY_ONLY; + incomplete = readFragment(coctx, buf, keyOnly, incomplete); if (keyOnly && key != null) @@ -191,36 +193,37 @@ public class CacheDataRowAdapter implements CacheDataRow { /** * @param coctx Cache object context. * @param addr Address. - * @param keyOnly {@code true} If need to read only key object. + * @param rowData Required row data. * @throws IgniteCheckedException If failed. */ - private void readFullRow(CacheObjectContext coctx, long addr, boolean keyOnly) throws IgniteCheckedException { + private void readFullRow(CacheObjectContext coctx, long addr, RowData rowData) throws IgniteCheckedException { int off = 0; int len = PageUtils.getInt(addr, off); off += 4; - byte type = PageUtils.getByte(addr, off); - off++; + if (rowData != RowData.NO_KEY) { + byte type = PageUtils.getByte(addr, off); + off++; - byte[] bytes = PageUtils.getBytes(addr, off, len); - off += len; + byte[] bytes = PageUtils.getBytes(addr, off, len); + off += len; - key = coctx.processor().toKeyCacheObject(coctx, type, bytes); + key = coctx.processor().toKeyCacheObject(coctx, type, bytes); - if (keyOnly) { - assert key != null: "key"; - - return; + if (rowData == RowData.KEY_ONLY) + return; } + else + off += len + 1; len = PageUtils.getInt(addr, off); off += 4; - type = PageUtils.getByte(addr, off); + byte type = PageUtils.getByte(addr, off); off++; - bytes = PageUtils.getBytes(addr, off, len); + byte[] bytes = PageUtils.getBytes(addr, off, len); off += len; val = coctx.processor().toCacheObject(coctx, type, bytes); @@ -230,8 +233,6 @@ public class CacheDataRowAdapter implements CacheDataRow { off += CacheVersionIO.size(ver, false); expireTime = PageUtils.getLong(addr, off); - - assert isReady(): "ready"; } /** @@ -391,6 +392,15 @@ public class CacheDataRowAdapter implements CacheDataRow { return key; } + /** + * @param key Key. + */ + public void key(KeyCacheObject key) { + assert key != null; + + this.key = key; + } + /** {@inheritDoc} */ @Override public CacheObject value() { assert val != null : "Value is not ready: " + this; @@ -430,11 +440,6 @@ public class CacheDataRowAdapter implements CacheDataRow { throw new UnsupportedOperationException(); } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheDataRowAdapter.class, this, "link", U.hexLong(link)); - } - /** * @param pageId Page ID. * @param cctx Cache context. @@ -444,4 +449,23 @@ public class CacheDataRowAdapter implements CacheDataRow { private Page page(final long pageId, final GridCacheContext cctx) throws IgniteCheckedException { return cctx.shared().database().pageMemory().page(cctx.cacheId(), pageId); } + + /** + * + */ + public enum RowData { + /** */ + FULL, + + /** */ + KEY_ONLY, + + /** */ + NO_KEY + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheDataRowAdapter.class, this, "link", U.hexLong(link)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java index be24b52..8d54542 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java @@ -42,6 +42,7 @@ public class RowStore { /** * @param cctx Cache context. + * @param freeList Free list. */ public RowStore(GridCacheContext<?,?> cctx, FreeList freeList) { assert cctx != null; @@ -75,12 +76,23 @@ public class RowStore { /** * @param row Row. + * @throws IgniteCheckedException If failed. */ public void addRow(CacheDataRow row) throws IgniteCheckedException { freeList.insertDataRow(row); } /** + * @param link Row link. + * @param row New row data. + * @throws IgniteCheckedException If failed. + * @return {@code True} if was able to update row. + */ + public boolean updateRow(long link, CacheDataRow row) throws IgniteCheckedException { + return freeList.updateDataRow(link, row); + } + + /** * @return Free list. */ public FreeList freeList() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java index 4cc7664..d72c5b9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeList.java @@ -31,6 +31,14 @@ public interface FreeList { /** * @param link Row link. + * @param row New row data. + * @return {@code True} if was able to update row. + * @throws IgniteCheckedException If failed. + */ + public boolean updateDataRow(long link, CacheDataRow row) throws IgniteCheckedException; + + /** + * @param link Row link. * @throws IgniteCheckedException If failed. */ public void removeDataRowByLink(long link) throws IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java index 6c1b21b..87d5e4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; +import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; @@ -71,6 +72,38 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { private final int MIN_SIZE_FOR_DATA_PAGE; /** */ + private final PageHandler<CacheDataRow, Boolean> updateRow = + new PageHandler<CacheDataRow, Boolean>() { + @Override public Boolean run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int itemId) + throws IgniteCheckedException { + DataPageIO io = (DataPageIO)iox; + + int rowSize = getRowSize(row); + + boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, getRowSize(row)); + + if (updated && isWalDeltaRecordNeeded(wal, page)) { + // TODO This record must contain only a reference to a logical WAL record with the actual data. + byte[] payload = new byte[rowSize]; + + DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize()); + + assert data.payloadSize() == rowSize; + + PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize); + + wal.log(new DataPageUpdateRecord( + cacheId, + page.id(), + itemId, + payload)); + } + + return updated; + } + }; + + /** */ private final PageHandler<CacheDataRow, Integer> writeRow = new PageHandler<CacheDataRow, Integer>() { @Override public Integer run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int written) @@ -101,7 +134,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { /** * @param page Page. - * @param buf Buffer. + * @param pageAddr Page address. * @param io IO. * @param row Row. * @param rowSize Row size. @@ -110,22 +143,22 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private int addRow( Page page, - long buf, + long pageAddr, DataPageIO io, CacheDataRow row, int rowSize ) throws IgniteCheckedException { - io.addRow(buf, row, rowSize, pageSize()); + io.addRow(pageAddr, row, rowSize, pageSize()); if (isWalDeltaRecordNeeded(wal, page)) { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[rowSize]; - DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); + DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize()); assert data.payloadSize() == rowSize; - PageUtils.getBytes(buf, data.offset(), payload, 0, rowSize); + PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize); wal.log(new DataPageInsertRecord( cacheId, @@ -138,7 +171,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { /** * @param page Page. - * @param buf Buffer. + * @param pageAddr Page address. * @param io IO. * @param row Row. * @param written Written size. @@ -148,7 +181,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private int addRowFragment( Page page, - long buf, + long pageAddr, DataPageIO io, CacheDataRow row, int written, @@ -157,7 +190,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // Read last link before the fragment write, because it will be updated there. long lastLink = row.link(); - int payloadSize = io.addRowFragment(pageMem, buf, row, written, rowSize, pageSize()); + int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize()); assert payloadSize > 0 : payloadSize; @@ -165,9 +198,9 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[payloadSize]; - DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); + DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize()); - PageUtils.getBytes(buf, data.offset(), payload, 0, payloadSize); + PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize); wal.log(new DataPageInsertFragmentRecord(cacheId, page.id(), payload, lastLink)); } @@ -334,6 +367,22 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { } /** {@inheritDoc} */ + @Override public boolean updateDataRow(long link, CacheDataRow row) throws IgniteCheckedException { + assert link != 0; + + long pageId = PageIdUtils.pageId(link); + int itemId = PageIdUtils.itemId(link); + + try (Page page = pageMem.page(cacheId, pageId)) { + Boolean updated = writePage(pageMem, page, this, updateRow, row, itemId, null); + + assert updated != null; // Can't fail here. + + return updated != null ? updated : false; + } + } + + /** {@inheritDoc} */ @Override public void removeDataRowByLink(long link) throws IgniteCheckedException { assert link != 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java index ff4be4b..07829bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java @@ -120,6 +120,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** */ private volatile TreeMetaData treeMeta; + /** + * + */ + public static interface RowClosure<L, R> { + /** + * @param io IO. + * @param pageAddr Page address. + * @param idx Index. + * @return Result. + * @throws IgniteCheckedException If failed. + */ + public R row(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException; + } + /** */ private final GridTreePrinter<Long> treePrinter = new GridTreePrinter<Long>() { /** */ @@ -815,19 +829,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param row Lookup row for exact match. - * @return Found row. + * @param c Found row closure. + * @return Found result. * @throws IgniteCheckedException If failed. */ - @SuppressWarnings("unchecked") - @Override public final T findOne(L row) throws IgniteCheckedException { + public final <R> R findOne(L row, RowClosure<L, R> c) throws IgniteCheckedException { checkDestroyed(); try { - GetOne g = new GetOne(row); + GetOne g = new GetOne(row, c); doFind(g); - return (T)g.row; + return (R)g.row; } catch (IgniteCheckedException e) { throw new IgniteCheckedException("Runtime failure on lookup row: " + row, e); @@ -841,6 +855,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** + * @param row Lookup row for exact match. + * @return Found row. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + @Override public final T findOne(L row) throws IgniteCheckedException { + return findOne(row, null); + } + + /** * @param g Get. * @throws IgniteCheckedException If failed. */ @@ -2052,11 +2076,17 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * Get a single entry. */ private final class GetOne extends Get { + /** */ + private final RowClosure<L, ?> c; + /** * @param row Row. + * @param c Row closure. */ - private GetOne(L row) { + private GetOne(L row, RowClosure<L, ?> c) { super(row); + + this.c = c; } /** {@inheritDoc} */ @@ -2065,7 +2095,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements if (lvl != 0 && !canGetRowFromInner) return false; - row = getRow(io, pageAddr, idx); + row = c != null ? (L)c.row(io, pageAddr, idx) : getRow(io, pageAddr, idx); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java index 548e300..fdb812f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.SB; +import org.jetbrains.annotations.Nullable; /** * Data pages IO. @@ -599,6 +600,39 @@ public class DataPageIO extends PageIO { /** * @param pageAddr Page address. + * @param itemId Item ID. + * @param pageSize Page size. + * @param payload Row data. + * @param row Row. + * @param rowSize Row size. + * @return {@code True} if entry is not fragmented. + * @throws IgniteCheckedException If failed. + */ + public boolean updateRow( + final long pageAddr, + int itemId, + int pageSize, + @Nullable byte[] payload, + @Nullable CacheDataRow row, + final int rowSize) throws IgniteCheckedException { + assert checkIndex(itemId) : itemId; + assert row != null ^ payload != null; + + final int dataOff = getDataOffset(pageAddr, itemId, pageSize); + + if (isFragmented(pageAddr, dataOff)) + return false; + + if (row != null) + writeRowData(pageAddr, dataOff, rowSize, row, false); + else + writeRowData(pageAddr, dataOff, payload); + + return true; + } + + /** + * @param pageAddr Page address. * @param itemId Fixed item ID (the index used for referencing an entry from the outside). * @param pageSize Page size. * @return Next link for fragmented entries or {@code 0} if none. @@ -727,7 +761,7 @@ public class DataPageIO extends PageIO { int dataOff = getDataOffsetForWrite(pageAddr, fullEntrySize, directCnt, indirectCnt, pageSize); - writeRowData(pageAddr, dataOff, rowSize, row); + writeRowData(pageAddr, dataOff, rowSize, row, true); int itemId = addItem(pageAddr, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); @@ -948,11 +982,11 @@ public class DataPageIO extends PageIO { /** * @param row Row to set link to. - * @param buf Page buffer. + * @param pageAddr Page address. * @param itemId Item ID. */ - private void setLink(CacheDataRow row, long buf, int itemId) { - row.link(PageIdUtils.link(getPageId(buf), itemId)); + private void setLink(CacheDataRow row, long pageAddr, int itemId) { + row.link(PageIdUtils.link(getPageId(pageAddr), itemId)); } /** @@ -1252,20 +1286,27 @@ public class DataPageIO extends PageIO { * @param dataOff Data offset. * @param payloadSize Payload size. * @param row Data row. + * @param newRow {@code False} if existing cache entry is updated, in this case skip key data write. * @throws IgniteCheckedException If failed. */ private void writeRowData( long pageAddr, int dataOff, int payloadSize, - CacheDataRow row + CacheDataRow row, + boolean newRow ) throws IgniteCheckedException { long addr = pageAddr + dataOff; - PageUtils.putShort(addr, 0, (short)payloadSize); - addr += 2; + if (newRow) { + PageUtils.putShort(addr, 0, (short)payloadSize); + addr += 2; + + addr += row.key().putValue(addr); + } + else + addr += (2 + row.key().valueBytesLength(null)); - addr += row.key().putValue(addr); addr += row.value().putValue(addr); CacheVersionIO.write(addr, row.version(), false); http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index b0ec408..fc78f69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -118,8 +118,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void onUpdateFinished(Long cntr) { - if (cntr != null) + @Override protected void onUpdateFinished(long cntr) { + if (cctx.shared().database().persistenceEnabled()) locPart.onUpdateReceived(cntr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 8fbd30a..9b30593 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -126,10 +126,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, private final CacheDataStore store; /** Partition updates. */ - private ConcurrentNavigableMap<Long, Boolean> updates = new ConcurrentSkipListMap<>(); + private final ConcurrentNavigableMap<Long, Boolean> updates = new ConcurrentSkipListMap<>(); /** Last applied update. */ - private AtomicLong lastApplied = new AtomicLong(0); + private final AtomicLong lastApplied = new AtomicLong(0); /** Set if failed to move partition to RENTING state due to reservations, to be checked when * reservation is released. */ @@ -253,16 +253,6 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>, } /** - * @return Last received update. - */ - private long lastReceivedUpdate() { - if (updates.isEmpty()) - return lastApplied.get(); - - return updates.lastKey(); - } - - /** * @param cntr Received counter. */ public void onUpdateReceived(long cntr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index c8deb08..ac81b63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; @@ -58,7 +59,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Nullable @Override public CacheObject unswap(boolean needVal, boolean checkExpire) throws IgniteCheckedException { + @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) throws IgniteCheckedException { return null; } @@ -68,8 +69,10 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void storeValue(CacheObject val, long expireTime, - GridCacheVersion ver) throws IgniteCheckedException { + @Override protected void storeValue(CacheObject val, + long expireTime, + GridCacheVersion ver, + CacheDataRow oldRow) throws IgniteCheckedException { // No-op for detached entries, index is updated on primary nodes. } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index a8e1ee6..7697e18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvcc; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -430,7 +431,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void storeValue(CacheObject val, long expireTime, GridCacheVersion ver) { + @Override protected void storeValue(CacheObject val, long expireTime, GridCacheVersion ver, CacheDataRow oldRow) { // No-op: queries are disabled for near cache. } @@ -446,7 +447,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Nullable @Override public CacheObject unswap(boolean needVal, boolean checkExpire) { + @Nullable @Override public CacheDataRow unswap(boolean needVal, boolean checkExpire) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8ef417c1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java index 6b6f44b..2024c36 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java @@ -59,7 +59,7 @@ public class H2RowFactory { final CacheDataRowAdapter rowBuilder = new CacheDataRowAdapter(link); - rowBuilder.initFromLink(cctx, false); + rowBuilder.initFromLink(cctx, CacheDataRowAdapter.RowData.FULL); GridH2Row row;
