Ability to write cache objects to byte buffers.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2476bb1c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2476bb1c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2476bb1c Branch: refs/heads/sql-store-cmp Commit: 2476bb1c8605dc30b4028784c47e3eba041a087d Parents: 53af8a8 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Fri Jan 22 18:53:30 2016 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Wed Feb 3 17:13:41 2016 +0300 ---------------------------------------------------------------------- .../internal/binary/BinaryEnumObjectImpl.java | 5 +++ .../internal/binary/BinaryObjectImpl.java | 13 ++++++++ .../binary/BinaryObjectOffheapImpl.java | 4 +++ .../internal/processors/cache/CacheObject.java | 3 ++ .../processors/cache/CacheObjectAdapter.java | 13 ++++++++ .../cache/CacheObjectByteArrayImpl.java | 12 ++++++++ .../processors/cache/GridCacheMapEntry.java | 32 ++++++++++---------- .../processors/cache/GridCacheProcessor.java | 4 ++- .../binary/CacheObjectBinaryProcessorImpl.java | 1 + .../distributed/dht/GridDhtCacheEntry.java | 2 +- .../colocated/GridDhtDetachedCacheEntry.java | 2 +- .../distributed/near/GridNearCacheEntry.java | 2 +- .../cache/query/GridCacheQueryManager.java | 6 ++-- .../cacheobject/IgniteCacheObjectProcessor.java | 8 +++++ .../IgniteCacheObjectProcessorImpl.java | 14 +++++++++ .../processors/query/GridQueryIndexing.java | 5 +-- .../processors/query/GridQueryProcessor.java | 7 +++-- .../processors/query/h2/IgniteH2Indexing.java | 15 ++++----- .../processors/query/h2/opt/GridH2Table.java | 5 ++- .../query/h2/opt/GridLuceneIndex.java | 5 +-- .../h2/GridIndexingSpiAbstractSelfTest.java | 30 ++++++++++-------- 21 files changed, 138 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java index ab76b6e..ad0d48f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java @@ -206,6 +206,11 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac } /** {@inheritDoc} */ + @Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException { + throw new UnsupportedOperationException("TODO implement."); + } + + /** {@inheritDoc} */ @Override public byte cacheObjectType() { return TYPE_BINARY; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java index a500ceb..a379dbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java @@ -133,6 +133,19 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern return arr0; } + @Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException { + int len = length(); + + if (buf.remaining() < len + 5) + return false; + + buf.put(cacheObjectType()); + buf.putInt(len); + buf.put(arr, start, len); + + return true; + } + /** {@inheritDoc} */ @Override public CacheObject prepareForCache(CacheObjectContext ctx) { if (detached()) http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java index ef6dac8..37c7f67 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java @@ -124,6 +124,10 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter return null; } + @Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException { + throw new UnsupportedOperationException("TODO implement"); + } + /** {@inheritDoc} */ @Override public long offheapAddress() { return ptr; http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index 2385335..6315ec3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; @@ -48,6 +49,8 @@ public interface CacheObject extends Message { */ public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException; + public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException; + /** * @return Object type. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 70f5ea6..21873d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.U; @@ -69,6 +70,18 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable } /** {@inheritDoc} */ + @Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException { + if (buf.remaining() < valBytes.length + 5) + return false; + + buf.put(cacheObjectType()); + buf.putInt(valBytes.length); + buf.put(valBytes); + + return true; + } + + /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { reader.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java index e961d84..d69e5dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java @@ -75,6 +75,18 @@ public class CacheObjectByteArrayImpl implements CacheObject, Externalizable { } /** {@inheritDoc} */ + @Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException { + if (buf.remaining() < val.length + 5) + return false; + + buf.put(cacheObjectType()); + buf.putInt(val.length); + buf.put(val); + + return true; + } + + /** {@inheritDoc} */ @Override public byte cacheObjectType() { return TYPE_BYTE_ARR; } http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 48f4275..cc3a2d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -530,7 +530,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return val; } else - clearIndex(e.value()); + clearIndex(e.value(), e.version()); } } } @@ -788,7 +788,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme releaseSwap(); // Previous value is guaranteed to be null - clearIndex(null); + clearIndex(null, ver); } else { // Read and remove swap entry. @@ -1026,7 +1026,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme deletedUnlocked(false); } else { - clearIndex(old); + clearIndex(old, ver); if (cctx.deferredDelete() && !isInternal() && !detached() && !deletedUnlocked()) deletedUnlocked(true); @@ -1323,7 +1323,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Clear indexes inside of synchronization since indexes // can be updated without actually holding entry lock. - clearIndex(old); + clearIndex(old, ver); boolean hadValPtr = hasOffHeapPointer(); @@ -1513,7 +1513,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (old != null) updateIndex(old, expireTime, ver, null); else - clearIndex(null); + clearIndex(null, ver); update(old, expireTime, ttl, ver); } @@ -1697,7 +1697,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - clearIndex(old); + clearIndex(old, this.ver); update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); @@ -2055,7 +2055,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (oldVal != null) updateIndex(oldVal, initExpireTime, ver, null); else - clearIndex(null); + clearIndex(null, ver); update(oldVal, initExpireTime, initTtl, ver); @@ -2331,7 +2331,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. - clearIndex(oldVal); + clearIndex(oldVal, ver); if (hadVal) { assert !deletedUnlocked(); @@ -2612,7 +2612,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (log.isDebugEnabled()) log.debug("Entry has been marked obsolete: " + this); - clearIndex(val); + clearIndex(val, ver); releaseSwap(); @@ -2793,7 +2793,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme releaseSwap(); - clearIndex(val); + clearIndex(val, ver); onInvalidate(); } @@ -3120,7 +3120,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (delta <= 0) { releaseSwap(); - clearIndex(saveValueForIndexUnlocked()); + clearIndex(saveValueForIndexUnlocked(), ver); return true; } @@ -3577,7 +3577,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - clearIndex(expiredVal); + clearIndex(expiredVal, ver); releaseSwap(); @@ -3750,14 +3750,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param prevVal Previous value (if needed for index update). * @throws IgniteCheckedException If failed. */ - protected void clearIndex(CacheObject prevVal) throws IgniteCheckedException { + protected void clearIndex(CacheObject prevVal, GridCacheVersion prevVer) throws IgniteCheckedException { assert Thread.holdsLock(this); try { GridCacheQueryManager<?, ?> qryMgr = cctx.queries(); if (qryMgr.enabled()) - qryMgr.remove(key(), prevVal); + qryMgr.remove(key(), prevVal, prevVer); } catch (IgniteCheckedException e) { throw new GridCacheIndexUpdateException(e); @@ -3916,7 +3916,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else - clearIndex(prev); + clearIndex(prev, ver); // Nullify value after swap. value(null); @@ -3970,7 +3970,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } else - clearIndex(prevVal); + clearIndex(prevVal, ver); // Nullify value after swap. value(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5acad6c..48c84d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2874,7 +2874,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { assert val != null; - qryMgr.remove(key, val); + // TODO sql-store + + qryMgr.remove(key, val, null); } catch (IgniteCheckedException e) { U.error(log, "Failed to unmarshal key evicted from swap [swapSpaceName=" + spaceName + ']', e); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 0fef6f8..04802d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 7ff5bdb..7278f20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -577,7 +577,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { if (log.isDebugEnabled()) log.debug("Entry has been marked obsolete: " + this); - clearIndex(prev); + clearIndex(prev, ver); // Give to GC. update(null, 0L, 0L, ver); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index c06f68b..25e9741 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -78,7 +78,7 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void clearIndex(CacheObject val) throws IgniteCheckedException { + @Override protected void clearIndex(CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { // No-op for detached entries, index is updated on primary or backup nodes. } http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 026fb4d..0b9d32a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -452,7 +452,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override protected void clearIndex(CacheObject val) { + @Override protected void clearIndex(CacheObject val, GridCacheVersion ver) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 8f0cab7..4f20b79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -403,7 +403,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - qryProc.store(space, key, val, CU.versionToBytes(ver), expirationTime); + qryProc.store(space, key, val, ver, expirationTime); } finally { invalidateResultCache(); @@ -418,7 +418,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings("SimplifiableIfStatement") - public void remove(CacheObject key, CacheObject val) throws IgniteCheckedException { + public void remove(CacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { assert key != null; if (!GridQueryProcessor.isEnabled(cctx.config()) && !(key instanceof GridCacheInternal)) @@ -428,7 +428,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return; // Ignore index update when node is stopping. try { - qryProc.remove(space, key, val); + qryProc.remove(space, key, val, ver); } finally { invalidateResultCache(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index cadf1a9..b7290b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cacheobject; +import java.nio.ByteBuffer; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -152,6 +153,13 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes); /** + * @param ctx Cache context. + * @param buf Buffer to read from. + * @return Cache object. + */ + public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf); + + /** * @param ctx Context. * @param valPtr Value pointer. * @param tmp If {@code true} can return temporary instance which is valid while entry lock is held. http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 54dd69e..c5a52d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cacheobject; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -169,6 +170,19 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ + @Override public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf) { + byte type = buf.get(); + + int len = buf.getInt(); + + byte[] data = new byte[len]; + + buf.get(data); + + return toCacheObject(ctx, type, data); + } + + /** {@inheritDoc} */ @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, boolean userObj) http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 7697a12..0ce93f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.lang.IgniteBiTuple; @@ -186,7 +187,7 @@ public interface GridQueryIndexing { * @throws IgniteCheckedException If failed. */ public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject key, CacheObject val, - byte[] ver, long expirationTime) throws IgniteCheckedException; + GridCacheVersion ver, long expirationTime) throws IgniteCheckedException; /** * Removes index entry by key. @@ -196,7 +197,7 @@ public interface GridQueryIndexing { * @param val Value. * @throws IgniteCheckedException If failed. */ - public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException; + public void remove(@Nullable String spaceName, CacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException; /** * Will be called when entry with given key is swapped. http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 64bbc8f..f6d19e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -637,7 +638,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") public void store(final String space, final CacheObject key, final CacheObject val, - byte[] ver, long expirationTime) throws IgniteCheckedException { + GridCacheVersion ver, long expirationTime) throws IgniteCheckedException { assert key != null; assert val != null; @@ -992,7 +993,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { * @param key Key. * @throws IgniteCheckedException Thrown in case of any errors. */ - public void remove(String space, CacheObject key, CacheObject val) throws IgniteCheckedException { + public void remove(String space, CacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { assert key != null; if (log.isDebugEnabled()) @@ -1011,7 +1012,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new IllegalStateException("Failed to remove from index (grid is stopping)."); try { - idx.remove(space, key, val); + idx.remove(space, key, val, ver); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 77156e0..aa79e7c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter; @@ -450,7 +451,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param tblToUpdate Table to update. * @throws IgniteCheckedException In case of error. */ - private void removeKey(@Nullable String spaceName, CacheObject key, TableDescriptor tblToUpdate) + private void removeKey(@Nullable String spaceName, CacheObject key, GridCacheVersion ver, TableDescriptor tblToUpdate) throws IgniteCheckedException { try { Collection<TableDescriptor> tbls = tables(schema(spaceName)); @@ -459,7 +460,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { for (TableDescriptor tbl : tbls) { if (tbl != tblToUpdate && tbl.type().keyClass().isAssignableFrom(keyCls)) { - if (tbl.tbl.update(key, null, 0, true)) { + if (tbl.tbl.update(key, null, ver, 0, true)) { if (tbl.luceneIdx != null) tbl.luceneIdx.remove(key); @@ -512,10 +513,10 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public void store(@Nullable String spaceName, GridQueryTypeDescriptor type, CacheObject k, CacheObject v, - byte[] ver, long expirationTime) throws IgniteCheckedException { + GridCacheVersion ver, long expirationTime) throws IgniteCheckedException { TableDescriptor tbl = tableDescriptor(spaceName, type); - removeKey(spaceName, k, tbl); + removeKey(spaceName, k, ver, tbl); if (tbl == null) return; // Type was rejected. @@ -523,7 +524,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (expirationTime == 0) expirationTime = Long.MAX_VALUE; - tbl.tbl.update(k, v, expirationTime, false); + tbl.tbl.update(k, v, ver, expirationTime, false); if (tbl.luceneIdx != null) tbl.luceneIdx.store(k, v, ver, expirationTime); @@ -574,7 +575,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public void remove(@Nullable String spaceName, CacheObject key, CacheObject val) throws IgniteCheckedException { + @Override public void remove(@Nullable String spaceName, CacheObject key, CacheObject val, GridCacheVersion ver) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Removing key from cache query index [locId=" + nodeId + ", key=" + key + ", val=" + val + ']'); @@ -586,7 +587,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { for (TableDescriptor tbl : tables(schema(spaceName))) { if (tbl.type().keyClass().isAssignableFrom(keyCls) && (val == null || tbl.type().valueClass().isAssignableFrom(valCls))) { - if (tbl.tbl.update(key, val, 0, true)) { + if (tbl.tbl.update(key, val, ver, 0, true)) { if (tbl.luceneIdx != null) tbl.luceneIdx.remove(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index f1e5b16..175f5be 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.h2.api.TableEngine; import org.h2.command.ddl.CreateTableData; @@ -331,10 +332,12 @@ public class GridH2Table extends TableBase { * @return {@code true} If operation succeeded. * @throws IgniteCheckedException If failed. */ - public boolean update(CacheObject key, CacheObject val, long expirationTime, boolean rmv) + public boolean update(CacheObject key, CacheObject val, GridCacheVersion ver, long expirationTime, boolean rmv) throws IgniteCheckedException { assert desc != null; + // TODO use version here. + GridH2Row row = desc.createRow(key, val, expirationTime); return doUpdate(row, rmv); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java index 957e5f6..814a76b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java @@ -25,6 +25,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryIndexType; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; @@ -161,7 +162,7 @@ public class GridLuceneIndex implements Closeable { * @param expires Expiration time. * @throws IgniteCheckedException If failed. */ - public void store(CacheObject k, CacheObject v, byte[] ver, long expires) throws IgniteCheckedException { + public void store(CacheObject k, CacheObject v, GridCacheVersion ver, long expires) throws IgniteCheckedException { CacheObjectContext coctx = objectContext(); Object key = k.isPlatformType() ? k.value(coctx, false) : k; @@ -201,7 +202,7 @@ public class GridLuceneIndex implements Closeable { if (type.valueClass() != String.class) doc.add(new Field(VAL_FIELD_NAME, v.valueBytes(coctx))); - doc.add(new Field(VER_FIELD_NAME, ver)); + doc.add(new Field(VER_FIELD_NAME, ver.toString().getBytes())); doc.add(new Field(EXPIRATION_TIME_FIELD_NAME, DateTools.timeToString(expires, DateTools.Resolution.MILLISECOND), Field.Store.YES, Field.Index.NOT_ANALYZED)); http://git-wip-us.apache.org/repos/asf/ignite/blob/2476bb1c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java index c027b26..cd7a203 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor; import org.apache.ignite.internal.processors.query.GridQueryIndexType; @@ -228,50 +229,50 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertFalse(spi.query(typeBA.space(), "select * from B.A", Collections.emptySet(), typeBA, null).hasNext()); // Nothing to remove. - spi.remove("A", key(1), aa(1, "", 10)); - spi.remove("B", key(1), ba(1, "", 10, true)); + spi.remove("A", key(1), aa(1, "", 10), null); + spi.remove("B", key(1), ba(1, "", 10, true), null); - spi.store(typeAA.space(), typeAA, key(1), aa(1, "Vasya", 10), "v1".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(1), aa(1, "Vasya", 10), new GridCacheVersion(), 0); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(0, spi.size(typeAB.space(), typeAB, null)); assertEquals(0, spi.size(typeBA.space(), typeBA, null)); spi.store(typeAB.space(), typeAB, key(1), ab(1, "Vasya", 20, "Some text about Vasya goes here."), - "v2".getBytes(), 0); + new GridCacheVersion(), 0); // In one space all keys must be unique. assertEquals(0, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(0, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeBA.space(), typeBA, key(1), ba(2, "Petya", 25, true), "v3".getBytes(), 0); + spi.store(typeBA.space(), typeBA, key(1), ba(2, "Petya", 25, true), new GridCacheVersion(), 0); // No replacement because of different space. assertEquals(0, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeBA.space(), typeBA, key(1), ba(2, "Kolya", 25, true), "v4".getBytes(), 0); + spi.store(typeBA.space(), typeBA, key(1), ba(2, "Kolya", 25, true), new GridCacheVersion(), 0); // Replacement in the same table. assertEquals(0, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeAA.space(), typeAA, key(2), aa(2, "Valera", 19), "v5".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(2), aa(2, "Valera", 19), new GridCacheVersion(), 0); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeAA.space(), typeAA, key(3), aa(3, "Borya", 18), "v6".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(3), aa(3, "Borya", 18), new GridCacheVersion(), 0); assertEquals(2, spi.size(typeAA.space(), typeAA, null)); assertEquals(1, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.store(typeAB.space(), typeAB, key(4), ab(4, "Vitalya", 20, "Very Good guy"), "v7".getBytes(), 0); + spi.store(typeAB.space(), typeAB, key(4), ab(4, "Vitalya", 20, "Very Good guy"), new GridCacheVersion(), 0); assertEquals(2, spi.size(typeAA.space(), typeAA, null)); assertEquals(2, spi.size(typeAB.space(), typeAB, null)); @@ -333,13 +334,13 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract assertFalse(fieldsRes.iterator().hasNext()); // Remove - spi.remove(typeAA.space(), key(2), aa(2, "Valera", 19)); + spi.remove(typeAA.space(), key(2), aa(2, "Valera", 19), null); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(2, spi.size(typeAB.space(), typeAB, null)); assertEquals(1, spi.size(typeBA.space(), typeBA, null)); - spi.remove(typeBA.space(), key(1), ba(2, "Kolya", 25, true)); + spi.remove(typeBA.space(), key(1), ba(2, "Kolya", 25, true), null); assertEquals(1, spi.size(typeAA.space(), typeAA, null)); assertEquals(2, spi.size(typeAB.space(), typeAB, null)); @@ -378,7 +379,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract spi.unregisterType(typeBA.space(), typeBA); // Should not store but should not fail as well. - spi.store(typeAA.space(), typeAA, key(10), aa(1, "Fail", 100500), "v220".getBytes(), 0); + spi.store(typeAA.space(), typeAA, key(10), aa(1, "Fail", 100500), new GridCacheVersion(), 0); assertEquals(-1, spi.size(typeAA.space(), typeAA, null)); } @@ -584,6 +585,11 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract } /** {@inheritDoc} */ + @Override public boolean putValue(ByteBuffer buf, CacheObjectContext ctx) throws IgniteCheckedException { + return false; + } + + /** {@inheritDoc} */ @Override public byte cacheObjectType() { throw new UnsupportedOperationException(); }