This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-11704 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 08b59ddcfb0739e9233ff46b0f438cc099604cc3 Author: sboikov <[email protected]> AuthorDate: Fri Jul 26 10:29:12 2019 +0300 ignite-11704 --- .../processors/cache/IncompleteCacheObject.java | 4 + .../processors/cache/IncompleteObject.java | 2 +- .../IgniteCacheDatabaseSharedManager.java | 112 ++++++++++++++++++--- 3 files changed, 105 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java index dedb3bd..5f75125 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteCacheObject.java @@ -75,6 +75,10 @@ public class IncompleteCacheObject extends IncompleteObject<CacheObject> { super.readData(buf); } + public int dataOffset() { + return off; + } + /** * @return Data type. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java index 7c24c12..27c9def 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java @@ -33,7 +33,7 @@ public class IncompleteObject<T> { private T obj; /** */ - private int off; + protected int off; /** * @param data Data bytes. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index feb2e78..567c104 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -139,7 +139,10 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap private volatile boolean firstEvictWarn; /** */ - private CacheObject TOMBSTONE_VAL; + private byte[] tombstoneBytes; + + /** */ + private CacheObject tombstoneVal; /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { @@ -156,11 +159,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap initDataRegions(memCfg); - TOMBSTONE_VAL = new CacheObjectImpl(null, cctx.marshaller().marshal(null)); + tombstoneBytes = cctx.marshaller().marshal(null); + + tombstoneVal = new CacheObjectImpl(null, tombstoneBytes); } public CacheObject tombstoneValue() { - return TOMBSTONE_VAL; + return tombstoneVal; } public boolean isTombstone(CacheDataRow row) throws IgniteCheckedException { @@ -172,22 +177,107 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap assert val != null : row; if (val.cacheObjectType() == CacheObject.TYPE_REGULAR) { - byte[] nullBytes = TOMBSTONE_VAL.valueBytes(null); byte[] bytes = val.valueBytes(null); - if (Arrays.equals(nullBytes, bytes)) + if (Arrays.equals(tombstoneBytes, bytes)) return true; } return false; } + /** + * @param buf Buffer. + * @param key Row key. + * @param incomplete Incomplete object. + * @return Tombstone flag or {@code null} if there is no enough data. + * @throws IgniteCheckedException If failed. + */ public Boolean isTombstone(ByteBuffer buf, @Nullable KeyCacheObject key, - @Nullable IncompleteCacheObject incomplete) { - // TODO IGNITE-11704 + @Nullable IncompleteCacheObject incomplete) throws IgniteCheckedException { + if (key == null) { + if (incomplete == null) { // Did not start read key yet. + if (buf.remaining() < IncompleteCacheObject.HEAD_LEN) { + return null; + } + + int keySize = buf.getInt(buf.position()); + + int headOffset = (IncompleteCacheObject.HEAD_LEN + keySize) /* key */ + + 8 /* expire time */; + + int requiredSize = headOffset + IncompleteCacheObject.HEAD_LEN; // Value header. + + if (buf.remaining() < requiredSize) + return null; + + return isTombstone(buf, headOffset); + } + else { // Reading key, check if there is enogh data to check value header. + byte[] data = incomplete.data(); + + if (data == null) // Header is not available yet. + return null; + + int keyRemaining = data.length - incomplete.dataOffset(); + + assert keyRemaining > 0 : keyRemaining; + + int headOffset = keyRemaining + 8 /* expire time */; + + int requiredSize = headOffset + IncompleteCacheObject.HEAD_LEN; // Value header. + + if (buf.remaining() < requiredSize) + return null; + + return isTombstone(buf, headOffset); + } + } + + if (incomplete == null) { // Did not start read value yet. + if (buf.remaining() < IncompleteCacheObject.HEAD_LEN) + return null; + + return isTombstone(buf, 0); + } + + byte[] data = incomplete.data(); + + if (data == null) // Header is not available yet. + return null; + + if (incomplete.type() != CacheObject.TYPE_REGULAR || data.length != tombstoneBytes.length) + return Boolean.FALSE; + return null; - } + } + + /** + * @param buf Buffer. + * @param offset Value offset. + * @return Tombstone flag or {@code null} if there is no enough data. + * @throws IgniteCheckedException If failed. + */ + private Boolean isTombstone(ByteBuffer buf, int offset) throws IgniteCheckedException { + int valLen = buf.getInt(buf.position() + offset); + if (valLen != tombstoneBytes.length) + return Boolean.FALSE; + + byte valType = buf.get(buf.position() + offset + 4); + if (valType != CacheObject.TYPE_REGULAR) + return Boolean.FALSE; + + if (buf.remaining() < (offset + 5 + tombstoneBytes.length)) + return null; + + for (int i = 0; i < tombstoneBytes.length; i++) { + if (tombstoneBytes[i] != buf.get(buf.position() + offset + 5 + i)) + return Boolean.FALSE; + } + + return Boolean.TRUE; + } public boolean isTombstone(long addr) throws IgniteCheckedException { int off = 0; @@ -197,11 +287,9 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap if (type != CacheObject.TYPE_REGULAR) return false; - byte[] nullBytes = TOMBSTONE_VAL.valueBytes(null); - int len = PageUtils.getInt(addr, off); - if (len != nullBytes.length) + if (len != tombstoneBytes.length) return false; off += 5; @@ -209,7 +297,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap for (int i = 0; i < len; i++) { byte b = PageUtils.getByte(addr, off++); - if (nullBytes[i] != b) + if (tombstoneBytes[i] != b) return false; }
