ignite-3300 Fixed issue with partition value changing stored in KeyCacheObject. (cherry picked from commit a441bb9)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9d07e3e7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9d07e3e7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9d07e3e7 Branch: refs/heads/ignite-3443 Commit: 9d07e3e7a34d1cbe67a9656630bc6215cd213f0a Parents: f3e4f78 Author: sboikov <[email protected]> Authored: Wed Jul 27 09:19:12 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 27 10:04:28 2016 +0300 ---------------------------------------------------------------------- .../internal/binary/BinaryObjectImpl.java | 11 + .../affinity/GridAffinityProcessor.java | 117 +++++--- .../processors/cache/CacheObjectContext.java | 12 + .../processors/cache/GridCacheAdapter.java | 8 +- .../cache/GridCacheAffinityManager.java | 24 +- .../processors/cache/GridCacheContext.java | 26 +- .../processors/cache/GridCacheEntryInfo.java | 2 +- .../processors/cache/KeyCacheObject.java | 7 + .../processors/cache/KeyCacheObjectImpl.java | 8 + .../cache/binary/CacheObjectBinaryContext.java | 10 +- .../binary/CacheObjectBinaryProcessorImpl.java | 38 ++- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- .../cacheobject/IgniteCacheObjectProcessor.java | 15 +- .../IgniteCacheObjectProcessorImpl.java | 69 ++++- .../datastreamer/DataStreamerImpl.java | 6 +- .../affinity/AffinityClientNodeSelfTest.java | 73 ++++- .../binary/BinaryMarshallerSelfTest.java | 7 +- .../BinaryObjectBuilderAdditionalSelfTest.java | 46 ++- .../cache/CacheGetEntryAbstractTest.java | 2 +- .../expiry/IgniteCacheTtlCleanupSelfTest.java | 2 +- .../CacheBinaryKeyConcurrentQueryTest.java | 298 +++++++++++++++++++ .../IgniteCacheWithIndexingTestSuite.java | 2 + 23 files changed, 625 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java index ae110f1..7b42c03 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java @@ -98,6 +98,17 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern } /** {@inheritDoc} */ + @Override public KeyCacheObject copy(int part) { + if (this.part == part) + return this; + + BinaryObjectImpl cp = new BinaryObjectImpl(ctx, arr, start); + cp.part = part; + + return cp; + } + + /** {@inheritDoc} */ @Override public int partition() { return part; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 0d3d36d..19e0842 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -144,6 +145,56 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * @param cacheName Cache name. + * @param key Key. + * @return Key partition. + * @throws IgniteCheckedException If failed. + */ + public int partition(@Nullable String cacheName, Object key) throws IgniteCheckedException { + return partition(cacheName, key, null); + } + + /** + * @param cacheName Cache name (needed only if {@code aff} is not provided. + * @param key Key. + * @param aff Affinity information. + * @return Key partition. + * @throws IgniteCheckedException If failed. + */ + public int partition(@Nullable String cacheName, + Object key, + @Nullable AffinityInfo aff) throws IgniteCheckedException { + if (key instanceof KeyCacheObject) { + int part = ((KeyCacheObject)key).partition(); + + if (part >= 0) + return part; + } + + return partition0(cacheName, key, aff); + } + + /** + * @param cacheName Cache name (needed only if {@code aff} is not provided. + * @param key Key. + * @param aff Affinity. + * @return Key partition. + * @throws IgniteCheckedException If failed. + */ + public int partition0(@Nullable String cacheName, + Object key, + @Nullable AffinityInfo aff) throws IgniteCheckedException { + if (aff == null) { + aff = affinityCache(cacheName, ctx.discovery().topologyVersionEx()); + + if (aff == null) + throw new IgniteCheckedException("Failed to get cache affinity."); + } + + return aff.affFunc.partition(aff.affinityKey(key)); + } + + /** * Maps keys to nodes for given cache. * * @param cacheName Cache name. @@ -206,7 +257,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter { if (affInfo == null) return Collections.emptyList(); - return primaryAndBackups(affInfo, key); + int part = partition(cacheName, key, affInfo); + + return affInfo.assignment.get(part); } /** @@ -224,13 +277,10 @@ public class GridAffinityProcessor extends GridProcessorAdapter { AffinityInfo affInfo = affinityCache(cacheName, ctx.discovery().topologyVersionEx()); - if (affInfo == null || affInfo.mapper == null) + if (affInfo == null) return null; - if (key instanceof CacheObject) - key = ((CacheObject)key).value(affInfo.cacheObjCtx, false); - - return affInfo.mapper.affinityKey(key); + return affInfo.affinityKey(key); } /** @@ -484,10 +534,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException In case of error. */ private <K> ClusterNode primary(AffinityInfo aff, K key) throws IgniteCheckedException { - if (key instanceof CacheObject && !(key instanceof BinaryObject)) - key = ((CacheObject)key).value(aff.cacheObjCtx, false); - - int part = aff.affFunc.partition(aff.mapper.affinityKey(key)); + int part = partition(null, key, aff); Collection<ClusterNode> nodes = aff.assignment.get(part); @@ -497,20 +544,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return nodes.iterator().next(); } - /** - * @param aff Affinity function. - * @param key Key to check. - * @return Primary and backup nodes. - */ - private <K> List<ClusterNode> primaryAndBackups(AffinityInfo aff, K key) { - if (key instanceof CacheObject && !(key instanceof BinaryObject)) - key = ((CacheObject)key).value(aff.cacheObjCtx, false); - - int part = aff.affFunc.partition(aff.mapper.affinityKey(key)); - - return aff.assignment.get(part); - } - /** {@inheritDoc} */ @Override public void printMemoryStats() { X.println(">>>"); @@ -551,6 +584,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** + * @param key Key. + * @return Affinity key. + */ + private Object affinityKey(Object key) { + if (key instanceof CacheObject && !(key instanceof BinaryObject)) + key = ((CacheObject)key).value(cacheObjCtx, false); + + return mapper.affinityKey(key); + } + + /** * @return Cache affinity function. */ private AffinityFunction affinityFunction() { @@ -564,13 +608,6 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return assignment; } - /** - * @return Key mapper. - */ - private AffinityKeyMapper keyMapper() { - return mapper; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(AffinityInfo.class, this); @@ -658,7 +695,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return cache().affinityFunction().partition(key); + return partition0(cacheName, key, cache()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -769,10 +806,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - if (key instanceof CacheObject) - key = ((CacheObject)key).value(cache().cacheObjCtx, false); - - return cache().keyMapper().affinityKey(key); + return cache().affinityKey(key); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -828,7 +862,9 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return cache().assignment().get(partition(key)); + AffinityInfo aff = cache(); + + return aff.assignment().get(GridAffinityProcessor.this.partition(cacheName, key, aff)); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -861,12 +897,17 @@ public class GridAffinityProcessor extends GridProcessorAdapter { Map<Integer, ClusterNode> map = new HashMap<>(); if (!F.isEmpty(parts)) { + AffinityInfo aff = cache(); + for (int p : parts) - map.put(p, mapPartitionToNode(p)); + map.put(p, F.first(aff.assignment().get(p))); } return map; } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } finally { ctx.gateway().readUnlock(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java index d7fdb83..c4203ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java @@ -41,6 +41,9 @@ import org.apache.ignite.internal.util.typedef.F; private IgniteCacheObjectProcessor proc; /** */ + private String cacheName; + + /** */ private AffinityKeyMapper dfltAffMapper; /** */ @@ -63,11 +66,13 @@ import org.apache.ignite.internal.util.typedef.F; * @param addDepInfo {@code true} if deployment info should be associated with the objects of this cache. */ public CacheObjectContext(GridKernalContext kernalCtx, + String cacheName, AffinityKeyMapper dfltAffMapper, boolean cpyOnGet, boolean storeVal, boolean addDepInfo) { this.kernalCtx = kernalCtx; + this.cacheName = cacheName; this.dfltAffMapper = dfltAffMapper; this.cpyOnGet = cpyOnGet; this.storeVal = storeVal; @@ -78,6 +83,13 @@ import org.apache.ignite.internal.util.typedef.F; } /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** * @return {@code True} if peer class loading is enabled. */ public boolean p2pEnabled() { http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 90669e0..1ac94a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1387,15 +1387,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V long start = statsEnabled ? System.nanoTime() : 0L; - boolean keeyBinary = ctx.keepBinary(); + boolean keepBinary = ctx.keepBinary(); - if (keeyBinary) + if (keepBinary) key = (K)ctx.toCacheKeyObject(key); - V val = get(key, !keeyBinary, false); + V val = get(key, !keepBinary, false); if (ctx.config().getInterceptor() != null) { - key = keeyBinary ? (K) ctx.unwrapBinaryIfNeeded(key, true, false) : key; + key = keepBinary ? (K) ctx.unwrapBinaryIfNeeded(key, true, false) : key; val = (V)ctx.config().getInterceptor().onGet(key, val); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 5e843dc..71ae5c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -187,28 +187,36 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** + * @param key Key. + * @return Partition. + */ + public int partition(Object key) { + return partition(key, true); + } + + /** * NOTE: Use this method always when you need to calculate partition id for * a key provided by user. It's required since we should apply affinity mapper * logic in order to find a key that will eventually be passed to affinity function. * * @param key Key. + * @param useKeyPart If {@code true} can use pre-calculated partition stored in KeyCacheObject. * @return Partition. */ - public int partition(Object key) { + public int partition(Object key, boolean useKeyPart) { GridAffinityAssignmentCache aff0 = aff; - if (key instanceof KeyCacheObject && ((KeyCacheObject)key).partition() != -1) - return ((KeyCacheObject)key).partition(); - if (aff0 == null) throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); - int p = affFunction.partition(affinityKey(key)); + if (useKeyPart && (key instanceof KeyCacheObject)) { + int part = ((KeyCacheObject)key).partition(); - if (key instanceof KeyCacheObject) - ((KeyCacheObject)key).partition(p); + if (part != -1) + return part; + } - return p; + return affFunction.partition(affinityKey(key)); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 36d9104..ba923df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1788,20 +1788,9 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Cache key object. */ public KeyCacheObject toCacheKeyObject(Object obj) { - return toCacheKeyObject(obj, false); - } - - /** - * @param obj Object. - * @return Cache key object. - */ - public KeyCacheObject toCacheKeyObject(Object obj, boolean includePartition) { assert validObjectForCache(obj) : obj; - if (includePartition) - return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true, affinity().partition(obj)); - else - return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, true); + return cacheObjects().toCacheKeyObject(cacheObjCtx, this, obj, true); } /** @@ -1822,7 +1811,7 @@ public class GridCacheContext<K, V> implements Externalizable { public KeyCacheObject toCacheKeyObject(byte[] bytes) throws IgniteCheckedException { Object obj = ctx.cacheObjects().unmarshal(cacheObjCtx, bytes, deploy().localLoader()); - return cacheObjects().toCacheKeyObject(cacheObjCtx, obj, false); + return cacheObjects().toCacheKeyObject(cacheObjCtx, this, obj, false); } /** @@ -1970,21 +1959,12 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Read-only collection of KeyCacheObject instances. */ public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys) { - return cacheKeysView(keys, false); - } - - /** - * @param keys Keys. - * @param includePartition Include partition. - * @return Read-only collection of KeyCacheObject instances. - */ - public Collection<KeyCacheObject> cacheKeysView(Collection<?> keys, final boolean includePartition) { return F.viewReadOnly(keys, new C1<Object, KeyCacheObject>() { @Override public KeyCacheObject apply(Object key) { if (key == null) throw new NullPointerException("Null key."); - return toCacheKeyObject(key, includePartition); + return toCacheKeyObject(key); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index c42e788..f281227 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -407,7 +407,7 @@ public class GridCacheEntryInfo implements Message { Object key0 = ctx.cacheObjects().unmarshal(cacheObjCtx, keyBytes, clsLdr); - key = ctx.cacheObjects().toCacheKeyObject(cacheObjCtx, key0, false); + key = ctx.cacheObjects().toCacheKeyObject(cacheObjCtx, ctx, key0, false); } else key.finishUnmarshal(ctx.cacheObjectContext(), clsLdr); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java index ffb846c..21b1f89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObject.java @@ -38,7 +38,14 @@ public interface KeyCacheObject extends CacheObject { /** * Sets partition ID for this key. + * * @param part Partition ID. */ public void partition(int part); + + /** + * @param part Partition ID. + * @return Copy of this object with given partition set. + */ + public KeyCacheObject copy(int part); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 35e681c..146e554 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -61,6 +61,14 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb } /** {@inheritDoc} */ + @Override public KeyCacheObject copy(int part) { + if (this.part == part) + return this; + + return new KeyCacheObjectImpl(val, valBytes, part); + } + + /** {@inheritDoc} */ @Override public int partition() { return part; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java index ec01f48..26c713c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java @@ -31,18 +31,24 @@ public class CacheObjectBinaryContext extends CacheObjectContext { /** * @param kernalCtx Kernal context. + * @param cacheName Cache name. * @param binaryEnabled Binary enabled flag. * @param cpyOnGet Copy on get flag. * @param storeVal {@code True} if should store unmarshalled value in cache. * @param depEnabled {@code true} if deployment is enabled for the given cache. */ public CacheObjectBinaryContext(GridKernalContext kernalCtx, + String cacheName, boolean cpyOnGet, boolean storeVal, boolean binaryEnabled, boolean depEnabled) { - super(kernalCtx, binaryEnabled ? new CacheDefaultBinaryAffinityKeyMapper() : - new GridCacheDefaultAffinityKeyMapper(), cpyOnGet, storeVal, depEnabled); + super(kernalCtx, + cacheName, + binaryEnabled ? new CacheDefaultBinaryAffinityKeyMapper() : new GridCacheDefaultAffinityKeyMapper(), + cpyOnGet, + storeVal, + depEnabled); this.binaryEnabled = binaryEnabled; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 8400594..6d980a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -728,6 +728,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm CacheObjectContext ctx0 = super.contextForCache(cfg); CacheObjectContext res = new CacheObjectBinaryContext(ctx, + cfg.getName(), ctx0.copyOnGet(), ctx0.storeValue(), binaryEnabled, @@ -760,43 +761,38 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm } /** {@inheritDoc} */ - @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) { - return toCacheKeyObject(ctx, obj, userObj, -1); - } - - /** {@inheritDoc} */ @Override public KeyCacheObject toCacheKeyObject( CacheObjectContext ctx, + @Nullable GridCacheContext cctx, Object obj, - boolean userObj, - int partition + boolean userObj ) { if (!((CacheObjectBinaryContext)ctx).binaryEnabled()) - return super.toCacheKeyObject(ctx, obj, userObj, partition); + return super.toCacheKeyObject(ctx, cctx, obj, userObj); if (obj instanceof KeyCacheObject) { - if (obj instanceof BinaryObjectImpl) { + KeyCacheObject key = (KeyCacheObject)obj; + + if (key instanceof BinaryObjectImpl) { // Need to create a copy because the key can be reused at the application layer after that (IGNITE-3505). - BinaryObjectImpl bObj = (BinaryObjectImpl)obj; - obj = new BinaryObjectImpl(bObj.context(), bObj.array(), bObj.start()); + key = key.copy(partition(ctx, cctx, key)); } + else if (key.partition() == -1) + // Assume others KeyCacheObjects can not be reused for another cache. + key.partition(partition(ctx, cctx, key)); - ((KeyCacheObject)obj).partition(partition); - - return (KeyCacheObject)obj; + return key; } - if (((CacheObjectBinaryContext)ctx).binaryEnabled()) { - obj = toBinary(obj); + obj = toBinary(obj); - if (obj instanceof KeyCacheObject) { - ((KeyCacheObject)obj).partition(partition); + if (obj instanceof BinaryObjectImpl) { + ((BinaryObjectImpl)obj).partition(partition(ctx, cctx, obj)); - return (KeyCacheObject)obj; - } + return (KeyCacheObject)obj; } - return toCacheKeyObject0(obj, userObj, partition); + return toCacheKeyObject0(ctx, cctx, obj, userObj); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 561c6c6..3616082 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -344,7 +344,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final boolean skipStore = opCtx != null && opCtx.skipStore(); try { - return getAsync0(ctx.toCacheKeyObject(key, true), + return getAsync0(ctx.toCacheKeyObject(key), !ctx.config().isReadFromBackup(), subjId, taskName, @@ -390,7 +390,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<V>>() { @Override public IgniteInternalFuture<V> apply() { - return getAsync0(ctx.toCacheKeyObject(key, true), + return getAsync0(ctx.toCacheKeyObject(key), forcePrimary, subjId0, taskName, @@ -436,7 +436,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { @Override public IgniteInternalFuture<Map<K, V>> apply() { - return getAllAsync0(ctx.cacheKeysView(keys, true), + return getAllAsync0(ctx.cacheKeysView(keys), forcePrimary, subjId0, taskName, http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index b2f2704..0d88ef8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -883,7 +883,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (val == null && op != GridCacheOperation.DELETE) continue; - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key, true); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); if (remapKeys != null && !remapKeys.contains(cacheKey)) continue; @@ -1000,7 +1000,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (val == null && op != GridCacheOperation.DELETE) throw new NullPointerException("Null value."); - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key, true); + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); if (op != TRANSFORM) val = cctx.toCacheObject(val); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java index b8ac301..27000b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java @@ -133,23 +133,14 @@ public interface IgniteCacheObjectProcessor extends GridProcessor { public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException; /** - * @param ctx Cache context. - * @param obj Key value. - * @param userObj If {@code true} then given object is object provided by user and should be copied - * before stored in cache. - * @return Cache key object. - */ - public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj); - - /** - * @param ctx Cache context. + * @param ctx Cache objects context. + * @param cctx Cache context if cache is available. * @param obj Key value. * @param userObj If {@code true} then given object is object provided by user and should be copied * before stored in cache. - * @param partition ID of partition this key belongs to. * @return Cache key object. */ - public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj, int partition); + public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj, boolean userObj); /** * @param ctx Cache context. http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index 3203548..9fd4c1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -108,18 +108,21 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** {@inheritDoc} */ - @Override @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj) { - return toCacheKeyObject(ctx, obj, userObj, -1); - } - - /** {@inheritDoc} */ - @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, boolean userObj, int partition) { + @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, + @Nullable GridCacheContext cctx, + Object obj, + boolean userObj) { if (obj instanceof KeyCacheObject) { - ((KeyCacheObject)obj).partition(partition); + KeyCacheObject key = (KeyCacheObject)obj; + + if (key.partition() == -1) + // Assume all KeyCacheObjects except BinaryObject can not be reused for another cache. + key.partition(partition(ctx, cctx, key)); + return (KeyCacheObject)obj; } - return toCacheKeyObject0(obj, userObj, partition); + return toCacheKeyObject0(ctx, cctx, obj, userObj); } /** @@ -129,11 +132,16 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme * @return Key cache object. */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") - protected KeyCacheObject toCacheKeyObject0(Object obj, boolean userObj, int partititon) { + protected KeyCacheObject toCacheKeyObject0(CacheObjectContext ctx, + @Nullable GridCacheContext cctx, + Object obj, + boolean userObj) { + int part = partition(ctx, cctx, obj); + if (!userObj) - return new KeyCacheObjectImpl(obj, null, partititon); + return new KeyCacheObjectImpl(obj, null, part); - return new UserKeyCacheObjectImpl(obj, partititon); + return new UserKeyCacheObjectImpl(obj, part); } /** {@inheritDoc} */ @@ -207,6 +215,25 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme return new UserCacheObjectImpl(obj, null); } + /** + * @param ctx Cache objects context. + * @param cctx Cache context. + * @param obj Object. + * @return Object partition. + */ + protected final int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj) { + try { + return cctx != null ? + cctx.affinity().partition(obj, false) : + ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get partition"); + + return -1; + } + } + /** {@inheritDoc} */ @Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException { assert ccfg != null; @@ -218,6 +245,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme !ccfg.isCopyOnRead(); CacheObjectContext res = new CacheObjectContext(ctx, + ccfg.getName(), ccfg.getAffinityMapper() != null ? ccfg.getAffinityMapper() : new GridCacheDefaultAffinityKeyMapper(), ccfg.isCopyOnRead() && memMode != OFFHEAP_VALUES, storeVal, @@ -304,8 +332,23 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** * @param key Key. */ - UserKeyCacheObjectImpl(Object key, int partition) { - super(key, null, partition); + UserKeyCacheObjectImpl(Object key, int part) { + super(key, null, part); + } + + /** + * @param key Key. + */ + UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) { + super(key, valBytes, part); + } + + /** {@inheritDoc} */ + @Override public KeyCacheObject copy(int part) { + if (this.partition() == part) + return this; + + return new UserKeyCacheObjectImpl(val, valBytes, part); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 21df559..e565cba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -519,12 +519,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1); for (Map.Entry<K, V> entry : entries) - keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, entry.getKey(), true)); + keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx, null, entry.getKey(), true)); } Collection<? extends DataStreamerEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() { @Override public DataStreamerEntry apply(Entry<K, V> e) { - KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true); + KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, e.getKey(), true); CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), true); return new DataStreamerEntry(key, val); @@ -619,7 +619,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed else checkSecurityPermission(SecurityPermission.CACHE_PUT); - KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true); + KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, null, key, true); CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true); return addDataInternal(Collections.singleton(new DataStreamerEntry(key0, val0))); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java index 47b01f4..fc50541 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityClientNodeSelfTest.java @@ -57,15 +57,15 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { /** */ private static final String CACHE4 = "cache4"; + /** */ + private static final String CACHE5 = "cache5"; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - if (gridName.equals(getTestGridName(NODE_CNT - 1))) - cfg.setClientMode(true); - CacheConfiguration ccfg1 = new CacheConfiguration(); ccfg1.setBackups(1); @@ -92,7 +92,18 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { ccfg4.setName(CACHE4); ccfg4.setNodeFilter(new TestNodesFilter()); - cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4); + CacheConfiguration ccfg5 = new CacheConfiguration(); + + ccfg5.setBackups(1); + ccfg5.setName(CACHE5); + + if (gridName.equals(getTestGridName(NODE_CNT - 1))) { + cfg.setClientMode(true); + + cfg.setCacheConfiguration(ccfg5); + } + else + cfg.setCacheConfiguration(ccfg1, ccfg2, ccfg3, ccfg4); return cfg; } @@ -123,6 +134,8 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { checkCache(CACHE4, 3); + checkCache(CACHE5, 2); + Ignite client = ignite(NODE_CNT - 1); CacheConfiguration ccfg = new CacheConfiguration(); @@ -157,6 +170,8 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { private void checkCache(String cacheName, int expNodes) { log.info("Test cache: " + cacheName); + Affinity<Object> aff0 = ignite(0).affinity(cacheName); + Ignite client = ignite(NODE_CNT - 1); assertTrue(client.configuration().isClientMode()); @@ -166,14 +181,22 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { for (int i = 0; i < NODE_CNT; i++) { Ignite ignite = ignite(i); - Affinity<Integer> aff = ignite.affinity(cacheName); + Affinity<Object> aff = ignite.affinity(cacheName); for (int part = 0; part < aff.partitions(); part++) { Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(part); assertEquals(expNodes, nodes.size()); + assertEquals(aff0.mapPartitionToPrimaryAndBackups(part), nodes); assertFalse(nodes.contains(clientNode)); + + assertEquals(aff0.partition(part), aff.partition(part)); + + TestKey key = new TestKey(part, part + 1); + + assertEquals(aff0.partition(key), aff.partition(key)); + assertEquals(aff0.mapKeyToPrimaryAndBackups(key), aff.mapKeyToPrimaryAndBackups(key)); } } } @@ -181,6 +204,46 @@ public class AffinityClientNodeSelfTest extends GridCommonAbstractTest { /** * */ + static class TestKey { + /** */ + private int id; + + /** */ + @AffinityKeyMapped + private int affId; + + /** + * @param id ID. + * @param affId Affinity ID. + */ + public TestKey(int id, int affId) { + this.id = id; + this.affId = affId; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey testKey = (TestKey)o; + + return id == testKey.id; + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ private static class TestNodesFilter implements IgnitePredicate<ClusterNode> { /** {@inheritDoc} */ @Override public boolean apply(ClusterNode clusterNode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java index 1285db7..f4c1bf7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java @@ -26,7 +26,6 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.lang.reflect.Modifier; import java.lang.reflect.Proxy; import java.math.BigDecimal; import java.math.BigInteger; @@ -90,13 +89,11 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertNotEquals; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * Binary marshaller tests. */ @@ -2401,7 +2398,7 @@ public class BinaryMarshallerSelfTest extends GridCommonAbstractTest { BinaryObjectImpl po = marshal(simpleObject(), marsh); - CacheObjectContext coCtx = new CacheObjectContext(newContext(), null, false, true, false); + CacheObjectContext coCtx = new CacheObjectContext(newContext(), null, null, false, true, false); assert po.value(coCtx, false) == po.value(coCtx, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java index be2ce9b..f999ad3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryObjectBuilderAdditionalSelfTest.java @@ -21,6 +21,26 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; import junit.framework.TestCase; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCache; @@ -29,9 +49,6 @@ import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.binary.BinaryType; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; @@ -46,27 +63,6 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Assert; -import java.lang.reflect.Field; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.sql.Timestamp; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; - import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -1497,7 +1493,5 @@ public class BinaryObjectBuilderAdditionalSelfTest extends GridCommonAbstractTes assert OBJ.equals(binaryObj.type().fieldTypeName("asListHint")); assert OBJ.equals(binaryObj.type().fieldTypeName("asSetHint")); assert OBJ.equals(binaryObj.type().fieldTypeName("asMapHint")); - } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java index c0ba42c..34480a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetEntryAbstractTest.java @@ -463,7 +463,7 @@ public abstract class CacheGetEntryAbstractTest extends GridCacheAbstractSelfTes CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext(); GridCacheMapEntry mapEntry = cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject( - cacheObjCtx, e.getKey(), true)); + cacheObjCtx, null, e.getKey(), true)); assertNotNull("No entry for key: " + e.getKey(), mapEntry); assertEquals(mapEntry.version(), e.version()); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java index e166acb..42027e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheTtlCleanupSelfTest.java @@ -82,6 +82,6 @@ public class IgniteCacheTtlCleanupSelfTest extends GridCacheAbstractSelfTest { CacheObjectContext cacheObjCtx = cacheAdapter.context().cacheObjectContext(); for (int i = 0; i < 100; i++) - assertNull(cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(cacheObjCtx, i, true))); + assertNull(cacheAdapter.map().getEntry(cacheObjects.toCacheKeyObject(cacheObjCtx, null, i, true))); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java new file mode 100644 index 0000000..e2de281 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheBinaryKeyConcurrentQueryTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +@SuppressWarnings("unchecked") +public class CacheBinaryKeyConcurrentQueryTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 3; + + /** */ + private static final int KEYS = 1000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setMarshaller(null); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testPutAndQueries() throws Exception { + Ignite ignite = ignite(0); + + IgniteCache cache1 = ignite.createCache(cacheConfiguration("cache1", ATOMIC)); + IgniteCache cache2 = ignite.createCache(cacheConfiguration("cache2", TRANSACTIONAL)); + + insertData(ignite, cache1.getName()); + insertData(ignite, cache2.getName()); + + IgniteInternalFuture<?> fut1 = startUpdate(cache1.getName()); + IgniteInternalFuture<?> fut2 = startUpdate(cache2.getName()); + + fut1.get(); + fut2.get(); + } + + /** + * @param cacheName Cache name. + * @return Future. + */ + private IgniteInternalFuture<?> startUpdate(final String cacheName) { + final long stopTime = System.currentTimeMillis() + 30_000; + + final AtomicInteger idx = new AtomicInteger(); + + return GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Void call() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + IgniteCache cache = ignite(idx.getAndIncrement() % NODES).cache(cacheName).withKeepBinary(); + + while (System.currentTimeMillis() < stopTime) { + switch (rnd.nextInt(5)) { + case 0: { + TestKey key = new TestKey(rnd.nextInt(KEYS)); + + CacheEntry e = cache.getEntry(key); + + assertNotNull(e); + assertTrue(e.getKey() instanceof BinaryObject); + + cache.put(e.getKey(), new TestValue(rnd.nextInt(KEYS))); + + break; + } + + case 1: { + Iterator<Cache.Entry> it = cache.iterator(); + + for (int i = 0; i < 100 && it.hasNext(); i++) { + Cache.Entry e = it.next(); + + assertTrue(e.getKey() instanceof BinaryObject); + + cache.put(e.getKey(), new TestValue(rnd.nextInt(KEYS))); + } + + break; + } + + case 2: { + SqlFieldsQuery qry = new SqlFieldsQuery("select _key " + + "from \"" + cache.getName() + "\".TestValue where id=?"); + + qry.setArgs(rnd.nextInt(KEYS)); + + List<List> res = cache.query(qry).getAll(); + + assertEquals(1, res.size()); + + BinaryObject key = (BinaryObject)res.get(0).get(0); + + cache.put(key, new TestValue(rnd.nextInt(KEYS))); + + break; + } + + case 3: { + SqlQuery qry = new SqlQuery("TestValue", "id=?"); + + qry.setArgs(rnd.nextInt(KEYS)); + + List<Cache.Entry> res = cache.query(qry).getAll(); + + assertEquals(1, res.size()); + + break; + } + + case 4: { + SqlQuery qry = new SqlQuery("TestValue", "order by id"); + + int cnt = 0; + + for (Cache.Entry e : (Iterable<Cache.Entry>)cache.query(qry)) { + assertNotNull(cache.get(e.getKey())); + + cnt++; + } + + assertTrue(cnt > 0); + + break; + } + + default: + fail(); + } + } + + return null; + } + }, NODES * 2, "test-thread"); + } + + /** + * @param ignite Node. + * @param cacheName Cache name. + */ + private void insertData(Ignite ignite, String cacheName) { + try (IgniteDataStreamer streamer = ignite.dataStreamer(cacheName)) { + for (int i = 0; i < KEYS; i++) + streamer.addData(new TestKey(i), new TestValue(i)); + } + } + + /** + * @param name Cache name. + * @param atomicityMode Cache atomicity mode. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setCacheMode(PARTITIONED); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setBackups(1); + + QueryEntity qryEntity = new QueryEntity(); + + qryEntity.setKeyType(TestKey.class.getName()); + qryEntity.setValueType(TestValue.class.getName()); + + qryEntity.addQueryField("id", Integer.class.getName(), null); + qryEntity.addQueryField("val", Integer.class.getName(), null); + + qryEntity.setIndexes(F.asList(new QueryIndex("id"), new QueryIndex("val"))); + + ccfg.setQueryEntities(F.asList(qryEntity)); + + return ccfg; + } + + /** + * + */ + static class TestKey { + /** */ + @QuerySqlField(index = true) + private int id; + + /** + * @param id ID. + */ + public TestKey(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestKey testKey = (TestKey)o; + + return id == testKey.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + static class TestValue { + /** */ + @QuerySqlField(index = true) + private int val; + + /** + * @param val Value. + */ + public TestValue(int val) { + this.val = val; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d07e3e7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index a85b7a6..96e8551 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.CacheBinaryKeyConcurrentQueryTest; import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest; import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest; import org.apache.ignite.internal.processors.cache.CacheOperationsWithExpirationTest; @@ -76,6 +77,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(CacheRandomOperationsMultithreadedTest.class); suite.addTestSuite(IgniteCacheStarvationOnRebalanceTest.class); suite.addTestSuite(CacheOperationsWithExpirationTest.class); + suite.addTestSuite(CacheBinaryKeyConcurrentQueryTest.class); return suite; }
