ignite-2042 Added special queue/set key classes to make collocation work with BinaryMarshaller. Also fixed issue with 'invoke' result with binary marshaller.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50f6c013 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50f6c013 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50f6c013 Branch: refs/heads/master Commit: 50f6c0131fd761f6231e7c2632a010c093000e70 Parents: 86ec37e Author: sboikov <sboi...@gridgain.com> Authored: Thu Dec 3 16:50:00 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Dec 3 16:50:00 2015 +0300 ---------------------------------------------------------------------- .../internal/portable/BinaryReaderExImpl.java | 2 +- .../internal/portable/PortableContext.java | 34 +- .../processors/cache/GridCacheContext.java | 30 ++ .../CacheDataStructuresManager.java | 31 +- .../dht/atomic/GridDhtAtomicCache.java | 20 +- .../CacheObjectBinaryProcessorImpl.java | 8 + .../cache/query/GridCacheQueryManager.java | 12 +- .../transactions/IgniteTxLocalAdapter.java | 11 +- .../datastructures/CollocatedQueueItemKey.java | 75 ++++ .../datastructures/CollocatedSetItemKey.java | 87 +++++ .../datastructures/DataStructuresProcessor.java | 7 +- .../GridAtomicCacheQueueImpl.java | 8 +- .../datastructures/GridCacheQueueAdapter.java | 30 +- .../datastructures/GridCacheQueueItemKey.java | 9 +- .../datastructures/GridCacheSetImpl.java | 37 +- .../datastructures/GridCacheSetItemKey.java | 21 +- .../GridTransactionalCacheQueueImpl.java | 2 +- .../processors/datastructures/QueueItemKey.java | 27 ++ .../processors/datastructures/SetItemKey.java | 36 ++ .../cache/IgniteCacheInvokeAbstractTest.java | 369 ++++++++++++++----- ...eAbstractDataStructuresFailoverSelfTest.java | 7 +- .../GridCacheQueueApiSelfAbstractTest.java | 18 +- .../GridCacheSetFailoverAbstractSelfTest.java | 6 +- .../GridCachePartitionedQueueApiSelfTest.java | 5 + ...dCachePartitionedQueueEntryMoveSelfTest.java | 2 +- .../IgnitePartitionedQueueNoBackupsTest.java | 92 +++++ .../GridCacheReplicatedQueueApiSelfTest.java | 5 + .../GridCacheWriteBehindStoreAbstractTest.java | 2 +- .../IgniteCacheDataStructuresSelfTestSuite.java | 3 + 29 files changed, 778 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java index ddbf6ba..91b67f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java @@ -246,7 +246,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina dataStart = start + DFLT_HDR_LEN; } - idMapper = userType ? ctx.userTypeIdMapper(typeId) : null; + idMapper = userType ? ctx.userTypeIdMapper(typeId) : BinaryInternalIdMapper.defaultInstance(); schema = PortableUtils.hasSchema(flags) ? getOrCreateSchema() : null; } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java index 1482df9..fd6c41d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java @@ -66,6 +66,8 @@ import org.apache.ignite.binary.BinarySerializer; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.portable.CacheObjectBinaryProcessorImpl; +import org.apache.ignite.internal.processors.datastructures.CollocatedQueueItemKey; +import org.apache.ignite.internal.processors.datastructures.CollocatedSetItemKey; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.lang.GridMapEntry; import org.apache.ignite.internal.util.typedef.F; @@ -233,7 +235,8 @@ public class PortableContext implements Externalizable { /** * @param marsh Portable marshaller. - * @throws org.apache.ignite.binary.BinaryObjectException In case of error. + * @param cfg Configuration. + * @throws BinaryObjectException In case of error. */ public void configure(BinaryMarshaller marsh, IgniteConfiguration cfg) throws BinaryObjectException { if (marsh == null) @@ -265,7 +268,7 @@ public class PortableContext implements Externalizable { * @param globalIdMapper ID mapper. * @param globalSerializer Serializer. * @param typeCfgs Type configurations. - * @throws org.apache.ignite.binary.BinaryObjectException In case of error. + * @throws BinaryObjectException In case of error. */ private void configure( BinaryIdMapper globalIdMapper, @@ -313,9 +316,8 @@ public class PortableContext implements Externalizable { } } - for (TypeDescriptor desc : descs.descriptors()) { + for (TypeDescriptor desc : descs.descriptors()) registerUserType(desc.clsName, desc.idMapper, desc.serializer, desc.affKeyFieldName, desc.isEnum); - } BinaryInternalIdMapper dfltMapper = BinaryInternalIdMapper.create(globalIdMapper); @@ -327,6 +329,20 @@ public class PortableContext implements Externalizable { affKeyFieldNames.putIfAbsent(typeId, entry.getValue()); } + + addSystemClassAffinityKey(CollocatedSetItemKey.class); + addSystemClassAffinityKey(CollocatedQueueItemKey.class); + } + + /** + * @param cls Class. + */ + private void addSystemClassAffinityKey(Class<?> cls) { + String fieldName = affinityFieldName(cls); + + assert fieldName != null : cls; + + affKeyFieldNames.putIfAbsent(cls.getName().hashCode(), affinityFieldName(cls)); } /** @@ -400,7 +416,7 @@ public class PortableContext implements Externalizable { /** * @param cls Class. * @return Class descriptor. - * @throws org.apache.ignite.binary.BinaryObjectException In case of error. + * @throws BinaryObjectException In case of error. */ public PortableClassDescriptor descriptorForClass(Class<?> cls, boolean deserialize) throws BinaryObjectException { @@ -722,7 +738,7 @@ public class PortableContext implements Externalizable { * @param serializer Serializer. * @param affKeyFieldName Affinity key field name. * @param isEnum If enum. - * @throws org.apache.ignite.binary.BinaryObjectException In case of error. + * @throws BinaryObjectException In case of error. */ @SuppressWarnings("ErrorNotRethrown") public void registerUserType(String clsName, @@ -808,7 +824,7 @@ public class PortableContext implements Externalizable { /** * @param typeId Type ID. * @return Meta data. - * @throws org.apache.ignite.binary.BinaryObjectException In case of error. + * @throws BinaryObjectException In case of error. */ @Nullable public BinaryType metadata(int typeId) throws BinaryObjectException { return metaHnd != null ? metaHnd.metadata(typeId) : null; @@ -964,7 +980,7 @@ public class PortableContext implements Externalizable { * @param affKeyFieldName Affinity key field name. * @param isEnum Enum flag. * @param canOverride Whether this descriptor can be override. - * @throws org.apache.ignite.binary.BinaryObjectException If failed. + * @throws BinaryObjectException If failed. */ private void add(String clsName, BinaryIdMapper idMapper, @@ -1044,7 +1060,7 @@ public class PortableContext implements Externalizable { * Override portable class descriptor. * * @param other Other descriptor. - * @throws org.apache.ignite.binary.BinaryObjectException If failed. + * @throws BinaryObjectException If failed. */ private void override(TypeDescriptor other) throws BinaryObjectException { assert clsName.equals(other.clsName); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 5b4f22c..d689ba6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -36,6 +36,7 @@ import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.expiry.EternalExpiryPolicy; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheInterceptor; @@ -53,6 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.managers.swapspace.GridSwapSpaceManager; +import org.apache.ignite.internal.portable.BinaryMarshaller; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -1682,6 +1684,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return {@code True} if {@link BinaryMarshaller is configured}. + */ + public boolean binaryMarshaller() { + return marshaller() instanceof BinaryMarshaller; + } + + /** * @return Keep portable flag. */ public boolean keepPortable() { @@ -1752,6 +1761,27 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @param resMap Invoke results map. + * @param keepBinary Keep binary flag. + * @return Unwrapped results. + */ + public Map unwrapInvokeResult(@Nullable Map<Object, EntryProcessorResult> resMap, final boolean keepBinary) { + return F.viewReadOnly(resMap, new C1<EntryProcessorResult, EntryProcessorResult>() { + @Override public EntryProcessorResult apply(EntryProcessorResult res) { + if (res instanceof CacheInvokeResult) { + CacheInvokeResult invokeRes = (CacheInvokeResult)res; + + if (invokeRes.result() != null) + res = CacheInvokeResult.fromResult(unwrapPortableIfNeeded(invokeRes.result(), + keepBinary, false)); + } + + return res; + } + }); + } + + /** * @return Cache object context. */ public CacheObjectContext cacheObjectContext() { http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index ec787f8..6ec29b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -54,9 +54,9 @@ import org.apache.ignite.internal.processors.datastructures.GridCacheQueueProxy; import org.apache.ignite.internal.processors.datastructures.GridCacheSetHeader; import org.apache.ignite.internal.processors.datastructures.GridCacheSetHeaderKey; import org.apache.ignite.internal.processors.datastructures.GridCacheSetImpl; -import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey; import org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy; import org.apache.ignite.internal.processors.datastructures.GridTransactionalCacheQueueImpl; +import org.apache.ignite.internal.processors.datastructures.SetItemKey; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; @@ -79,7 +79,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { private final ConcurrentMap<IgniteUuid, GridCacheSetProxy> setsMap; /** Set keys used for set iteration. */ - private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<GridCacheSetItemKey>> setDataMap = + private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<SetItemKey>> setDataMap = new ConcurrentHashMap8<>(); /** Queues map. */ @@ -311,12 +311,13 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { * * @param key Key. * @param rmv {@code True} if entry was removed. + * @param keepPortable Keep portable flag. */ public void onEntryUpdated(KeyCacheObject key, boolean rmv, boolean keepPortable) { Object key0 = cctx.cacheObjectContext().unwrapPortableIfNeeded(key, keepPortable, false); - if (key0 instanceof GridCacheSetItemKey) - onSetItemUpdated((GridCacheSetItemKey)key0, rmv); + if (key0 instanceof SetItemKey) + onSetItemUpdated((SetItemKey)key0, rmv); } /** @@ -327,11 +328,11 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { public void onPartitionEvicted(int part) { GridCacheAffinityManager aff = cctx.affinity(); - for (GridConcurrentHashSet<GridCacheSetItemKey> set : setDataMap.values()) { - Iterator<GridCacheSetItemKey> iter = set.iterator(); + for (GridConcurrentHashSet<SetItemKey> set : setDataMap.values()) { + Iterator<SetItemKey> iter = set.iterator(); while (iter.hasNext()) { - GridCacheSetItemKey key = iter.next(); + SetItemKey key = iter.next(); if (aff.partition(key) == part) iter.remove(); @@ -415,7 +416,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { * @param id Set ID. * @return Data for given set. */ - @Nullable public GridConcurrentHashSet<GridCacheSetItemKey> setData(IgniteUuid id) { + @Nullable public GridConcurrentHashSet<SetItemKey> setData(IgniteUuid id) { return setDataMap.get(id); } @@ -436,7 +437,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { cctx.preloader().syncFuture().get(); } - GridConcurrentHashSet<GridCacheSetItemKey> set = setDataMap.get(setId); + GridConcurrentHashSet<SetItemKey> set = setDataMap.get(setId); if (set == null) return; @@ -445,9 +446,9 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { final int BATCH_SIZE = 100; - Collection<GridCacheSetItemKey> keys = new ArrayList<>(BATCH_SIZE); + Collection<SetItemKey> keys = new ArrayList<>(BATCH_SIZE); - for (GridCacheSetItemKey key : set) { + for (SetItemKey key : set) { if (!loc && !aff.primary(cctx.localNode(), key, topVer)) continue; @@ -555,14 +556,14 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { * @param key Set item key. * @param rmv {@code True} if item was removed. */ - private void onSetItemUpdated(GridCacheSetItemKey key, boolean rmv) { - GridConcurrentHashSet<GridCacheSetItemKey> set = setDataMap.get(key.setId()); + private void onSetItemUpdated(SetItemKey key, boolean rmv) { + GridConcurrentHashSet<SetItemKey> set = setDataMap.get(key.setId()); if (set == null) { if (rmv) return; - GridConcurrentHashSet<GridCacheSetItemKey> old = setDataMap.putIfAbsent(key.setId(), + GridConcurrentHashSet<SetItemKey> old = setDataMap.putIfAbsent(key.setId(), set = new GridConcurrentHashSet<>()); if (old != null) @@ -592,7 +593,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - private void retryRemoveAll(final IgniteInternalCache cache, final Collection<GridCacheSetItemKey> keys) + private void retryRemoveAll(final IgniteInternalCache cache, final Collection<SetItemKey> keys) throws IgniteCheckedException { DataStructuresProcessor.retry(log, new Callable<Void>() { @Override public Void call() throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d8ab62a..c5ec258 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -824,25 +824,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return resFut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) throws IgniteCheckedException { - Map<K, EntryProcessorResult<T>> resMap = fut.get(); + Map<Object, EntryProcessorResult> resMap = (Map)fut.get(); - if (resMap != null) { - return F.viewReadOnly(resMap, new C1<EntryProcessorResult<T>, EntryProcessorResult<T>>() { - @Override public EntryProcessorResult<T> apply(EntryProcessorResult<T> res) { - if (res instanceof CacheInvokeResult) { - CacheInvokeResult invokeRes = (CacheInvokeResult)res; - - if (invokeRes.result() != null) - res = CacheInvokeResult.fromResult((T)ctx.unwrapPortableIfNeeded(invokeRes.result(), - keepBinary, false)); - } - - return res; - } - }); - } - - return null; + return ctx.unwrapInvokeResult(resMap, keepBinary); } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java index 220a45a..d172bca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java @@ -602,6 +602,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm if (affKeyFieldName != null) return po.field(affKeyFieldName); } + else if (po instanceof BinaryObjectEx) { + int id = ((BinaryObjectEx)po).typeId(); + + String affKeyFieldName = portableCtx.affinityKeyFieldName(id); + + if (affKeyFieldName != null) + return po.field(affKeyFieldName); + } } catch (BinaryObjectException e) { U.error(log, "Failed to get affinity field from portable object: " + po, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index bef587a..bb5d230 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -47,8 +47,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey; import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate; +import org.apache.ignite.internal.processors.datastructures.SetItemKey; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryFieldsResult; @@ -761,21 +761,21 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte IgniteUuid id = filter.setId(); - Collection<GridCacheSetItemKey> data = cctx.dataStructures().setData(id); + Collection<SetItemKey> data = cctx.dataStructures().setData(id); if (data == null) data = Collections.emptyList(); final GridIterator<IgniteBiTuple<K, V>> it = F.iterator( data, - new C1<GridCacheSetItemKey, IgniteBiTuple<K, V>>() { - @Override public IgniteBiTuple<K, V> apply(GridCacheSetItemKey e) { + new C1<SetItemKey, IgniteBiTuple<K, V>>() { + @Override public IgniteBiTuple<K, V> apply(SetItemKey e) { return new IgniteBiTuple<>((K)e.item(), (V)Boolean.TRUE); } }, true, - new P1<GridCacheSetItemKey>() { - @Override public boolean apply(GridCacheSetItemKey e) { + new P1<SetItemKey>() { + @Override public boolean apply(SetItemKey e) { return filter.apply(e, null); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index f13cff4..33c0fa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -3220,8 +3220,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter try { txFut.get(); - return new GridCacheReturn(cacheCtx, true, keepBinary, - implicitRes.value(), implicitRes.success()); + Object res = implicitRes.value(); + + if (implicitRes.invokeResult()) { + assert res == null || res instanceof Map : implicitRes; + + res = cacheCtx.unwrapInvokeResult((Map)res, keepBinary); + } + + return new GridCacheReturn(cacheCtx, true, keepBinary, res, implicitRes.success()); } catch (IgniteCheckedException | RuntimeException e) { rollbackAsync(); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java new file mode 100644 index 0000000..8eb9fa0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedQueueItemKey.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; + +/** + * + */ +public class CollocatedQueueItemKey implements QueueItemKey { + /** */ + private IgniteUuid queueId; + + /** */ + @AffinityKeyMapped + private int queueNameHash; + + /** */ + private long idx; + + /** + * @param queueId Queue unique ID. + * @param queueName Queue name. + * @param idx Item index. + */ + public CollocatedQueueItemKey(IgniteUuid queueId, String queueName, long idx) { + this.queueId = queueId; + this.queueNameHash = queueName.hashCode(); + this.idx = idx; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + CollocatedQueueItemKey itemKey = (CollocatedQueueItemKey)o; + + return idx == itemKey.idx && queueId.equals(itemKey.queueId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = queueId.hashCode(); + + res = 31 * res + (int)(idx ^ (idx >>> 32)); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CollocatedQueueItemKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java new file mode 100644 index 0000000..94cffd4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/CollocatedSetItemKey.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; + +/** + * + */ +public class CollocatedSetItemKey implements SetItemKey { + /** */ + private IgniteUuid setId; + + /** */ + @GridToStringInclude + private Object item; + + /** */ + @AffinityKeyMapped + private int setNameHash; + + /** + * @param setName Set name. + * @param setId Set unique ID. + * @param item Set item. + */ + public CollocatedSetItemKey(String setName, IgniteUuid setId, Object item) { + this.setNameHash = setName.hashCode(); + this.setId = setId; + this.item = item; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid setId() { + return setId; + } + + /** {@inheritDoc} */ + @Override public Object item() { + return item; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = setId.hashCode(); + + res = 31 * res + item.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + CollocatedSetItemKey that = (CollocatedSetItemKey)o; + + return setId.equals(that.setId) && item.equals(that.item); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CollocatedSetItemKey.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 998bd92..9ed9350 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -113,12 +113,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { /** Initial capacity. */ private static final int INITIAL_CAPACITY = 10; - /** */ - private static final int MAX_UPDATE_RETRIES = 100; - - /** */ - private static final long RETRY_DELAY = 1; - /** Initialization latch. */ private final CountDownLatch initLatch = new CountDownLatch(1); @@ -986,6 +980,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { hdr.id(), name, hdr.collocated(), + cctx.binaryMarshaller(), hdr.head(), hdr.tail(), 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java index b433887..58d3efe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridAtomicCacheQueueImpl.java @@ -55,7 +55,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { checkRemoved(idx); - GridCacheQueueItemKey key = itemKey(idx); + QueueItemKey key = itemKey(idx); cache.getAndPut(key, item); @@ -78,7 +78,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { checkRemoved(idx); - GridCacheQueueItemKey key = itemKey(idx); + QueueItemKey key = itemKey(idx); T data = (T)cache.getAndRemove(key); @@ -115,7 +115,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { checkRemoved(idx); - Map<GridCacheQueueItemKey, T> putMap = new HashMap<>(); + Map<QueueItemKey, T> putMap = new HashMap<>(); for (T item : items) { putMap.put(itemKey(idx), item); @@ -140,7 +140,7 @@ public class GridAtomicCacheQueueImpl<T> extends GridCacheQueueAdapter<T> { if (idx != null) { checkRemoved(idx); - GridCacheQueueItemKey key = itemKey(idx); + QueueItemKey key = itemKey(idx); if (cache.remove(key)) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index df1bd88..ca0250d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -58,9 +58,6 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp protected static final long QUEUE_REMOVED_IDX = Long.MIN_VALUE; /** */ - protected static final long RETRY_DELAY = 1; - - /** */ private static final int DFLT_CLEAR_BATCH_SIZE = 100; /** Logger. */ @@ -98,6 +95,9 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp @GridToStringExclude private final Semaphore writeSem; + /** */ + private final boolean binaryMarsh; + /** * @param queueName Queue name. * @param hdr Queue hdr. @@ -112,6 +112,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp collocated = hdr.collocated(); queueKey = new GridCacheQueueHeaderKey(queueName); cache = cctx.kernalContext().cache().internalCache(cctx.name()); + binaryMarsh = cctx.binaryMarshaller(); log = cctx.logger(getClass()); @@ -369,7 +370,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp checkRemoved(t.get1()); - removeKeys(cache, id, queueName, collocated, t.get1(), t.get2(), batchSize); + removeKeys(cache, id, queueName, collocated, binaryMarsh, t.get1(), t.get2(), batchSize); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -407,6 +408,7 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @param id Queue unique ID. * @param name Queue name. * @param collocated Collocation flag. + * @param binaryMarsh {@code True} if binary marshaller is configured. * @param startIdx Start item index. * @param endIdx End item index. * @param batchSize Batch size. @@ -418,14 +420,15 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp IgniteUuid id, String name, boolean collocated, + boolean binaryMarsh, long startIdx, long endIdx, int batchSize) throws IgniteCheckedException { - Set<GridCacheQueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10); + Set<QueueItemKey> keys = new HashSet<>(batchSize > 0 ? batchSize : 10); for (long idx = startIdx; idx < endIdx; idx++) { - keys.add(itemKey(id, name, collocated, idx)); + keys.add(itemKey(id, name, collocated, binaryMarsh, idx)); if (batchSize > 0 && keys.size() == batchSize) { cache.removeAll(keys); @@ -536,8 +539,8 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @param idx Item index. * @return Item key. */ - protected GridCacheQueueItemKey itemKey(Long idx) { - return itemKey(id, queueName, collocated(), idx); + protected QueueItemKey itemKey(Long idx) { + return itemKey(id, queueName, collocated(), binaryMarsh, idx); } /** {@inheritDoc} */ @@ -558,11 +561,18 @@ public abstract class GridCacheQueueAdapter<T> extends AbstractCollection<T> imp * @param id Queue unique ID. * @param queueName Queue name. * @param collocated Collocation flag. + * @param binaryMarsh {@code True} if binary marshaller is configured. * @param idx Item index. * @return Item key. */ - private static GridCacheQueueItemKey itemKey(IgniteUuid id, String queueName, boolean collocated, long idx) { - return collocated ? new CollocatedItemKey(id, queueName, idx) : new GridCacheQueueItemKey(id, queueName, idx); + private static QueueItemKey itemKey(IgniteUuid id, + String queueName, + boolean collocated, + boolean binaryMarsh, + long idx) { + return collocated ? + (binaryMarsh ? new CollocatedQueueItemKey(id, queueName, idx) : new CollocatedItemKey(id, queueName, idx)) : + new GridCacheQueueItemKey(id, queueName, idx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java index c4cb7b1..df47e73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueItemKey.java @@ -21,7 +21,6 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -29,7 +28,7 @@ import org.apache.ignite.lang.IgniteUuid; /** * Queue item key. */ -class GridCacheQueueItemKey implements Externalizable, GridCacheInternal { +class GridCacheQueueItemKey implements Externalizable, QueueItemKey { /** */ private static final long serialVersionUID = 0L; @@ -110,11 +109,11 @@ class GridCacheQueueItemKey implements Externalizable, GridCacheInternal { /** {@inheritDoc} */ @Override public int hashCode() { - int result = queueId.hashCode(); + int res = queueId.hashCode(); - result = 31 * result + (int)(idx ^ (idx >>> 32)); + res = 31 * res + (int)(idx ^ (idx >>> 32)); - return result; + return res; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 62eab61..f25e361 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -66,7 +66,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite private final GridCacheContext ctx; /** Cache. */ - private final IgniteInternalCache<GridCacheSetItemKey, Boolean> cache; + private final IgniteInternalCache<SetItemKey, Boolean> cache; /** Logger. */ private final IgniteLogger log; @@ -86,6 +86,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite /** Removed flag. */ private volatile boolean rmvd; + /** */ + private final boolean binaryMarsh; + /** * @param ctx Cache context. * @param name Set name. @@ -97,6 +100,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite this.name = name; id = hdr.id(); collocated = hdr.collocated(); + binaryMarsh = ctx.binaryMarshaller(); cache = ctx.cache(); @@ -140,7 +144,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite onAccess(); if (ctx.isLocal() || ctx.isReplicated()) { - GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id); + GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id); return set != null ? set.size() : 0; } @@ -171,7 +175,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite @Override public boolean isEmpty() { onAccess(); - GridConcurrentHashSet<GridCacheSetItemKey> set = ctx.dataStructures().setData(id); + GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id); return (set == null || set.isEmpty()) && size() == 0; } @@ -180,7 +184,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite @Override public boolean contains(Object o) { onAccess(); - final GridCacheSetItemKey key = itemKey(o); + final SetItemKey key = itemKey(o); return retry(new Callable<Boolean>() { @Override public Boolean call() throws Exception { @@ -193,7 +197,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite @Override public boolean add(T o) { onAccess(); - final GridCacheSetItemKey key = itemKey(o); + final SetItemKey key = itemKey(o); return retry(new Callable<Boolean>() { @Override public Boolean call() throws Exception { @@ -206,7 +210,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite @Override public boolean remove(Object o) { onAccess(); - final GridCacheSetItemKey key = itemKey(o); + final SetItemKey key = itemKey(o); return retry(new Callable<Boolean>() { @Override public Boolean call() throws Exception { @@ -231,7 +235,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite boolean add = false; - Map<GridCacheSetItemKey, Boolean> addKeys = null; + Map<SetItemKey, Boolean> addKeys = null; for (T obj : c) { if (add) { @@ -247,7 +251,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } } else - add |= add(obj); + add = add(obj); } if (!F.isEmpty(addKeys)) @@ -262,7 +266,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite boolean rmv = false; - Set<GridCacheSetItemKey> rmvKeys = null; + Set<SetItemKey> rmvKeys = null; for (Object obj : c) { if (rmv) { @@ -278,7 +282,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } } else - rmv |= remove(obj); + rmv = remove(obj); } if (!F.isEmpty(rmvKeys)) @@ -295,7 +299,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite try (GridCloseableIterator<T> iter = iterator0()) { boolean rmv = false; - Set<GridCacheSetItemKey> rmvKeys = null; + Set<SetItemKey> rmvKeys = null; for (T val : iter) { if (!c.contains(val)) { @@ -331,7 +335,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite onAccess(); try (GridCloseableIterator<T> iter = iterator0()) { - Collection<GridCacheSetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE); + Collection<SetItemKey> rmvKeys = new ArrayList<>(BATCH_SIZE); for (T val : iter) { rmvKeys.add(itemKey(val)); @@ -425,7 +429,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite /** * @param keys Keys to remove. */ - private void retryRemoveAll(final Collection<GridCacheSetItemKey> keys) { + private void retryRemoveAll(final Collection<SetItemKey> keys) { retry(new Callable<Void>() { @Override public Void call() throws Exception { cache.removeAll(keys); @@ -438,7 +442,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite /** * @param keys Keys to remove. */ - private void retryPutAll(final Map<GridCacheSetItemKey, Boolean> keys) { + private void retryPutAll(final Map<SetItemKey, Boolean> keys) { retry(new Callable<Void>() { @Override public Void call() throws Exception { cache.putAll(keys); @@ -523,8 +527,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite * @param item Set item. * @return Item key. */ - private GridCacheSetItemKey itemKey(Object item) { - return collocated ? new CollocatedItemKey(name, id, item) : new GridCacheSetItemKey(id, item); + private SetItemKey itemKey(Object item) { + return collocated ? (binaryMarsh ? new CollocatedSetItemKey(name, id, item) : new CollocatedItemKey(name, id, item)) + : new GridCacheSetItemKey(id, item); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java index d025dce..8b47b3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetItemKey.java @@ -21,7 +21,6 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -30,7 +29,7 @@ import org.apache.ignite.lang.IgniteUuid; /** * Set item key. */ -public class GridCacheSetItemKey implements GridCacheInternal, Externalizable { +public class GridCacheSetItemKey implements SetItemKey, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -57,27 +56,23 @@ public class GridCacheSetItemKey implements GridCacheInternal, Externalizable { this.item = item; } - /** - * @return Set UUID. - */ - public IgniteUuid setId() { + /** {@inheritDoc} */ + @Override public IgniteUuid setId() { return setId; } - /** - * @return Set item. - */ - public Object item() { + /** {@inheritDoc} */ + @Override public Object item() { return item; } /** {@inheritDoc} */ @Override public int hashCode() { - int result = setId.hashCode(); + int res = setId.hashCode(); - result = 31 * result + item.hashCode(); + res = 31 * res + item.hashCode(); - return result; + return res; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index 4880324..32e94d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -143,7 +143,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> if (idx != null) { checkRemoved(idx); - Map<GridCacheQueueItemKey, T> putMap = new HashMap<>(); + Map<QueueItemKey, T> putMap = new HashMap<>(); for (T item : items) { putMap.put(itemKey(idx), item); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java new file mode 100644 index 0000000..fe0cef3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/QueueItemKey.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.internal.processors.cache.GridCacheInternal; + +/** + * + */ +public interface QueueItemKey extends GridCacheInternal { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java new file mode 100644 index 0000000..759945a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/SetItemKey.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.datastructures; + +import org.apache.ignite.internal.processors.cache.GridCacheInternal; +import org.apache.ignite.lang.IgniteUuid; + +/** + * + */ +public interface SetItemKey extends GridCacheInternal { + /** + * @return Set UUID. + */ + public IgniteUuid setId(); + + /** + * @return Set item. + */ + public Object item(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java index b881d90..51a70b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeAbstractTest.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -139,6 +140,31 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT tx = startTx(txMode); + TestValue testVal = cache.invoke(key, new UserClassValueProcessor()); + + if (tx != null) + tx.commit(); + + assertEquals("63", testVal.value()); + + checkValue(key, 63); + + tx = startTx(txMode); + + Collection<TestValue> testValCol = cache.invoke(key, new CollectionReturnProcessor()); + + if (tx != null) + tx.commit(); + + assertEquals(10, testValCol.size()); + + for (TestValue val : testValCol) + assertEquals("64", val.value()); + + checkValue(key, 63); + + tx = startTx(txMode); + GridTestUtils.assertThrows(log, new Callable<Void>() { @Override public Void call() throws Exception { cache.invoke(key, new ExceptionProcessor(63)); @@ -237,166 +263,226 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT IncrementProcessor incProcessor = new IncrementProcessor(); - Transaction tx = startTx(txMode); + { + Transaction tx = startTx(txMode); - Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor); + Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); - Map<Object, Object> exp = new HashMap<>(); + Map<Object, Object> exp = new HashMap<>(); - for (Integer key : keys) - exp.put(key, -1); + for (Integer key : keys) + exp.put(key, -1); - checkResult(resMap, exp); + checkResult(resMap, exp); - for (Integer key : keys) - checkValue(key, 1); + for (Integer key : keys) + checkValue(key, 1); + } - tx = startTx(txMode); + { + Transaction tx = startTx(txMode); - resMap = cache.invokeAll(keys, incProcessor); + Map<Integer, EntryProcessorResult<TestValue>> resMap = cache.invokeAll(keys, new UserClassValueProcessor()); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); - exp = new HashMap<>(); + Map<Object, Object> exp = new HashMap<>(); - for (Integer key : keys) - exp.put(key, 1); + for (Integer key : keys) + exp.put(key, new TestValue("1")); - checkResult(resMap, exp); + checkResult(resMap, exp); - for (Integer key : keys) - checkValue(key, 2); + for (Integer key : keys) + checkValue(key, 1); + } - tx = startTx(txMode); + { + Transaction tx = startTx(txMode); - resMap = cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30); + Map<Integer, EntryProcessorResult<Collection<TestValue>>> resMap = + cache.invokeAll(keys, new CollectionReturnProcessor()); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); - for (Integer key : keys) - exp.put(key, 3); + Map<Object, Object> exp = new HashMap<>(); - checkResult(resMap, exp); + for (Integer key : keys) { + List<TestValue> expCol = new ArrayList<>(); - for (Integer key : keys) - checkValue(key, 62); + for (int i = 0; i < 10; i++) + expCol.add(new TestValue("2")); - tx = startTx(txMode); + exp.put(key, expCol); + } - resMap = cache.invokeAll(keys, new ExceptionProcessor(null)); + checkResult(resMap, exp); - if (tx != null) - tx.commit(); + for (Integer key : keys) + checkValue(key, 1); + } - for (Integer key : keys) { - final EntryProcessorResult<Integer> res = resMap.get(key); + { + Transaction tx = startTx(txMode); - assertNotNull("No result for " + key); + Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, incProcessor); - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - res.get(); + if (tx != null) + tx.commit(); - return null; - } - }, EntryProcessorException.class, "Test processor exception."); + Map<Object, Object> exp = new HashMap<>(); + + for (Integer key : keys) + exp.put(key, 1); + + checkResult(resMap, exp); + + for (Integer key : keys) + checkValue(key, 2); } - for (Integer key : keys) - checkValue(key, 62); + { + Transaction tx = startTx(txMode); - tx = startTx(txMode); + Map<Integer, EntryProcessorResult<Integer>> resMap = + cache.invokeAll(keys, new ArgumentsSumProcessor(), 10, 20, 30); - Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>(); + if (tx != null) + tx.commit(); + + Map<Object, Object> exp = new HashMap<>(); + + for (Integer key : keys) + exp.put(key, 3); + + checkResult(resMap, exp); + + for (Integer key : keys) + checkValue(key, 62); + } + + { + Transaction tx = startTx(txMode); - for (Integer key : keys) { - switch (key % 4) { - case 0: invokeMap.put(key, new IncrementProcessor()); break; + Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new ExceptionProcessor(null)); - case 1: invokeMap.put(key, new RemoveProcessor(62)); break; + if (tx != null) + tx.commit(); - case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break; + for (Integer key : keys) { + final EntryProcessorResult<Integer> res = resMap.get(key); - case 3: invokeMap.put(key, new ExceptionProcessor(62)); break; + assertNotNull("No result for " + key); - default: - fail(); + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + res.get(); + + return null; + } + }, EntryProcessorException.class, "Test processor exception."); } + + for (Integer key : keys) + checkValue(key, 62); } - resMap = cache.invokeAll(invokeMap, 10, 20, 30); + { + Transaction tx = startTx(txMode); - if (tx != null) - tx.commit(); + Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>(); - for (Integer key : keys) { - final EntryProcessorResult<Integer> res = resMap.get(key); + for (Integer key : keys) { + switch (key % 4) { + case 0: invokeMap.put(key, new IncrementProcessor()); break; - switch (key % 4) { - case 0: { - assertNotNull("No result for " + key, res); + case 1: invokeMap.put(key, new RemoveProcessor(62)); break; - assertEquals(62, (int)res.get()); + case 2: invokeMap.put(key, new ArgumentsSumProcessor()); break; - checkValue(key, 63); + case 3: invokeMap.put(key, new ExceptionProcessor(62)); break; - break; + default: + fail(); } + } - case 1: { - assertNull(res); + Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(invokeMap, 10, 20, 30); - checkValue(key, null); + if (tx != null) + tx.commit(); - break; - } + for (Integer key : keys) { + final EntryProcessorResult<Integer> res = resMap.get(key); - case 2: { - assertNotNull("No result for " + key, res); + switch (key % 4) { + case 0: { + assertNotNull("No result for " + key, res); - assertEquals(3, (int)res.get()); + assertEquals(62, (int)res.get()); - checkValue(key, 122); + checkValue(key, 63); - break; - } + break; + } + + case 1: { + assertNull(res); + + checkValue(key, null); + + break; + } + + case 2: { + assertNotNull("No result for " + key, res); + + assertEquals(3, (int)res.get()); + + checkValue(key, 122); - case 3: { - assertNotNull("No result for " + key, res); + break; + } - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - res.get(); + case 3: { + assertNotNull("No result for " + key, res); - return null; - } - }, EntryProcessorException.class, "Test processor exception."); + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + res.get(); - checkValue(key, 62); + return null; + } + }, EntryProcessorException.class, "Test processor exception."); - break; + checkValue(key, 62); + + break; + } } } } cache.invokeAll(keys, new IncrementProcessor()); - tx = startTx(txMode); + { + Transaction tx = startTx(txMode); - resMap = cache.invokeAll(keys, new RemoveProcessor(null)); + Map<Integer, EntryProcessorResult<Integer>> resMap = cache.invokeAll(keys, new RemoveProcessor(null)); - if (tx != null) - tx.commit(); + if (tx != null) + tx.commit(); - assertEquals("Unexpected results: " + resMap, 0, resMap.size()); + assertEquals("Unexpected results: " + resMap, 0, resMap.size()); - for (Integer key : keys) - checkValue(key, null); + for (Integer key : keys) + checkValue(key, null); + } IgniteCache<Integer, Integer> asyncCache = cache.withAsync(); @@ -406,9 +492,9 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT IgniteFuture<Map<Integer, EntryProcessorResult<Integer>>> fut = asyncCache.future(); - resMap = fut.get(); + Map<Integer, EntryProcessorResult<Integer>> resMap = fut.get(); - exp = new HashMap<>(); + Map<Object, Object> exp = new HashMap<>(); for (Integer key : keys) exp.put(key, -1); @@ -418,7 +504,7 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT for (Integer key : keys) checkValue(key, 1); - invokeMap = new HashMap<>(); + Map<Integer, EntryProcessor<Integer, Integer, Integer>> invokeMap = new HashMap<>(); for (Integer key : keys) invokeMap.put(key, incProcessor); @@ -442,15 +528,16 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT * @param resMap Result map. * @param exp Expected results. */ - private void checkResult(Map<Integer, EntryProcessorResult<Integer>> resMap, Map<Object, Object> exp) { + @SuppressWarnings("unchecked") + private void checkResult(Map resMap, Map<Object, Object> exp) { assertNotNull(resMap); assertEquals(exp.size(), resMap.size()); for (Map.Entry<Object, Object> expVal : exp.entrySet()) { - EntryProcessorResult<Integer> res = resMap.get(expVal.getKey()); + EntryProcessorResult<?> res = (EntryProcessorResult)resMap.get(expVal.getKey()); - assertNotNull("No result for " + expVal.getKey()); + assertNotNull("No result for " + expVal.getKey(), res); assertEquals("Unexpected result for " + expVal.getKey(), res.get(), expVal.getValue()); } @@ -557,6 +644,44 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT /** * */ + protected static class UserClassValueProcessor implements EntryProcessor<Integer, Integer, TestValue> { + /** {@inheritDoc} */ + @Override public TestValue process(MutableEntry<Integer, Integer> e, Object... arguments) + throws EntryProcessorException { + return new TestValue(String.valueOf(e.getValue())); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(UserClassValueProcessor.class, this); + } + } + + /** + * + */ + protected static class CollectionReturnProcessor implements + EntryProcessor<Integer, Integer, Collection<TestValue>> { + /** {@inheritDoc} */ + @Override public Collection<TestValue> process(MutableEntry<Integer, Integer> e, Object... arguments) + throws EntryProcessorException { + List<TestValue> vals = new ArrayList<>(); + + for (int i = 0; i < 10; i++) + vals.add(new TestValue(String.valueOf(e.getValue() + 1))); + + return vals; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CollectionReturnProcessor.class, this); + } + } + + /** + * + */ protected static class IncrementProcessor implements EntryProcessor<Integer, Integer, Integer> { /** {@inheritDoc} */ @Override public Integer process(MutableEntry<Integer, Integer> e, @@ -656,4 +781,50 @@ public abstract class IgniteCacheInvokeAbstractTest extends IgniteCacheAbstractT return S.toString(ExceptionProcessor.class, this); } } + + /** + * + */ + static class TestValue { + /** */ + private String val; + + /** + * @param val Value. + */ + public TestValue(String val) { + this.val = val; + } + + /** + * @return Value. + */ + public String value() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue testVal = (TestValue) o; + + return val.equals(testVal.val); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestValue.class, this); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java index 2751de1..ef96d9f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheAbstractDataStructuresFailoverSelfTest.java @@ -170,11 +170,8 @@ public abstract class GridCacheAbstractDataStructuresFailoverSelfTest extends Ig while (U.currentTimeMillis() < stopTime) assertEquals(10, atomic.get()); } - catch (IgniteException e) { - if (X.hasCause(e, ClusterTopologyServerNotFoundException.class)) - return; - - throw e; + catch (IgniteException ignore) { + return; // Test that client does not hang. } fail(); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java index cf638df..6366f09 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheQueueApiSelfAbstractTest.java @@ -244,15 +244,27 @@ public abstract class GridCacheQueueApiSelfAbstractTest extends IgniteCollection } /** - * JUnit. - * * @throws Exception If failed. */ public void testIterator() throws Exception { + checkIterator(false); + } + + /** + * @throws Exception If failed. + */ + public void testIteratorCollocated() throws Exception { + checkIterator(true); + } + + /** + * @param collocated Collocated flag. + */ + private void checkIterator(boolean collocated) { // Random queue name. String queueName = UUID.randomUUID().toString(); - IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(false)); + IgniteQueue<String> queue = grid(0).queue(queueName, 0, config(collocated)); for (int i = 0; i < 100; i++) assert queue.add(Integer.toString(i)); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java index 74c9a4f..ca57205 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetFailoverAbstractSelfTest.java @@ -31,7 +31,7 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; -import org.apache.ignite.internal.processors.datastructures.GridCacheSetItemKey; +import org.apache.ignite.internal.processors.datastructures.SetItemKey; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.testframework.GridTestUtils; @@ -183,8 +183,8 @@ public abstract class GridCacheSetFailoverAbstractSelfTest extends IgniteCollect if (entry.hasValue()) { cnt++; - if (entry.key() instanceof GridCacheSetItemKey) { - GridCacheSetItemKey setItem = (GridCacheSetItemKey)entry.key(); + if (entry.key() instanceof SetItemKey) { + SetItemKey setItem = (SetItemKey)entry.key(); if (setIds.add(setItem.setId())) log.info("Unexpected set item [setId=" + setItem.setId() + http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java index 2420153..de2fa07 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueApiSelfTest.java @@ -31,6 +31,11 @@ import static org.apache.ignite.cache.CacheMode.PARTITIONED; */ public class GridCachePartitionedQueueApiSelfTest extends GridCacheQueueApiSelfAbstractTest { /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ @Override protected CacheMode collectionCacheMode() { return PARTITIONED; } http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java index 1d225a6..db11291 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java @@ -90,7 +90,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends IgniteCollection * @throws Exception If failed. */ public void testQueue() throws Exception { - final String queueName = "queue-test-name"; + final String queueName = "q"; System.out.println(U.filler(20, '\n')); http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java new file mode 100644 index 0000000..880c638 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/partitioned/IgnitePartitionedQueueNoBackupsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.datastructures.partitioned; + +import java.util.Iterator; +import java.util.UUID; +import org.apache.ignite.IgniteQueue; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.testframework.GridTestUtils; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class IgnitePartitionedQueueNoBackupsTest extends GridCachePartitionedQueueApiSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode collectionCacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheMemoryMode collectionMemoryMode() { + return ONHEAP_TIERED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode collectionCacheAtomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CollectionConfiguration collectionConfiguration() { + CollectionConfiguration colCfg = super.collectionConfiguration(); + + colCfg.setBackups(0); + + return colCfg; + } + + /** + * @throws Exception If failed. + */ + public void testCollocation() throws Exception { + IgniteQueue<Integer> queue = grid(0).queue("queue", 0, config(true)); + + for (int i = 0; i < 1000; i++) + assertTrue(queue.add(i)); + + assertEquals(1000, queue.size()); + + GridCacheContext cctx = GridTestUtils.getFieldValue(queue, "cctx"); + + UUID setNodeId = null; + + for (int i = 0; i < gridCount(); i++) { + IgniteKernal grid = (IgniteKernal)grid(i); + + Iterator<GridCacheEntryEx> entries = + grid.context().cache().internalCache(cctx.name()).map().allEntries0().iterator(); + + if (entries.hasNext()) { + if (setNodeId == null) + setNodeId = grid.localNode().id(); + else + fail("For collocated queue all items should be stored on single node."); + } + } + }} http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java index 1aea6d9..bad37a9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/replicated/GridCacheReplicatedQueueApiSelfTest.java @@ -31,6 +31,11 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; */ public class GridCacheReplicatedQueueApiSelfTest extends GridCacheQueueApiSelfAbstractTest { /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ @Override protected CacheMode collectionCacheMode() { return REPLICATED; } http://git-wip-us.apache.org/repos/asf/ignite/blob/50f6c013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java index 4a5141e..e9674f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java @@ -114,7 +114,7 @@ public abstract class GridCacheWriteBehindStoreAbstractTest extends GridCommonAb Map<Integer, String> map = store.getMap(); - assert map.isEmpty(); + assert map.isEmpty() : map; Transaction tx = grid().transactions().txStart(OPTIMISTIC, REPEATABLE_READ);