IGNITE-3075 Fixed condition for 'single' request creation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d32fa21b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d32fa21b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d32fa21b Branch: refs/heads/ignite-4242 Commit: d32fa21b673814b060d2362f06ff44838e9c2cdc Parents: f2dc1d7 Author: sboikov <[email protected]> Authored: Tue Nov 22 11:33:55 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Nov 22 11:33:55 2016 +0300 ---------------------------------------------------------------------- .../GridDhtAtomicAbstractUpdateFuture.java | 19 ++++++++----- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../atomic/GridDhtAtomicSingleUpdateFuture.java | 30 +++++++++++++------- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 10 +++++-- .../GridNearAtomicAbstractUpdateRequest.java | 5 ---- .../atomic/GridNearAtomicFullUpdateRequest.java | 5 ---- .../GridNearAtomicSingleUpdateRequest.java | 9 ------ ...CacheLoadingConcurrentGridStartSelfTest.java | 6 +++- 8 files changed, 44 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 7e4c4e0..361fbe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -86,9 +86,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** Update response. */ final GridNearAtomicUpdateResponse updateRes; - /** Force transform backup flag. */ - private boolean forceTransformBackups; - /** Mappings. */ @GridToStringInclude protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings; @@ -198,7 +195,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte writeVer, syncMode, topVer, - forceTransformBackups); + ttl, + conflictExpireTime, + conflictVer); mappings.put(nodeId, updateReq); } @@ -265,7 +264,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte writeVer, syncMode, topVer, - forceTransformBackups); + ttl, + expireTime, + null); mappings.put(nodeId, updateReq); } @@ -404,7 +405,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte * @param writeVer Update version. * @param syncMode Write synchronization mode. * @param topVer Topology version. - * @param forceTransformBackups Force transform backups flag. + * @param ttl TTL. + * @param conflictExpireTime Conflict expire time. + * @param conflictVer Conflict version. * @return Request. */ protected abstract GridDhtAtomicAbstractUpdateRequest createRequest( @@ -413,7 +416,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, - boolean forceTransformBackups + long ttl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer ); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 2a7055d..940c74e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2369,7 +2369,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; - boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(), + boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.partition(), req.topologyVersion()); Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index 656caab..20d6e90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -30,10 +30,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * @@ -100,11 +102,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, - boolean forceTransformBackups + long ttl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer ) { - if (canUseSingleRequest(node)) { - assert !forceTransformBackups; - + if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) { return new GridDhtAtomicSingleUpdateRequest( cctx.cacheId(), node.id(), @@ -126,10 +128,10 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture writeVer, syncMode, topVer, - forceTransformBackups, + false, updateReq.subjectId(), updateReq.taskNameHash(), - forceTransformBackups ? updateReq.invokeArguments() : null, + null, cctx.deploymentEnabled(), updateReq.keepBinary(), updateReq.skipStore()); @@ -166,13 +168,19 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture /** * @param node Target node - * @return {@code true} if target node supports {@link GridNearAtomicSingleUpdateRequest} + * @param ttl TTL. + * @param conflictExpireTime Conflict expire time. + * @param conflictVer Conflict version. + * @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}. */ - private boolean canUseSingleRequest(ClusterNode node) { + private boolean canUseSingleRequest(ClusterNode node, + long ttl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer) { return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 && - cctx.expiry() == null && - updateReq.expiry() == null && - !updateReq.hasConflictData(); + (ttl == CU.TTL_NOT_CHANGED) && + (conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) && + conflictVer == null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index dd1f1c4..efb35c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * DHT atomic cache backup update future. @@ -126,7 +127,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, - boolean forceTransformBackups) { + long ttl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer + ) { return new GridDhtAtomicUpdateRequest( cctx.cacheId(), node.id(), @@ -134,10 +138,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { writeVer, syncMode, topVer, - forceTransformBackups, + false, updateReq.subjectId(), updateReq.taskNameHash(), - forceTransformBackups ? updateReq.invokeArguments() : null, + null, cctx.deploymentEnabled(), updateReq.keepBinary(), updateReq.skipStore()); http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index bae9e3a..bee2ecd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -223,9 +223,4 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa * @return Key. */ public abstract KeyCacheObject key(int idx); - - /** - * @return {@code True} if request does not have conflict data. - */ - public abstract boolean hasConflictData(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index c785828..1b11688 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -534,11 +534,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat } /** {@inheritDoc} */ - @Override public boolean hasConflictData() { - return F.size(conflictVers) > 0 || conflictTtls != null || conflictExpireTimes != null; - } - - /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index f3b9726..1c1addd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -227,15 +227,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin } /** {@inheritDoc} */ - @Override public boolean hasConflictData() { - return false; - } - - /** - * {@inheritDoc} - * - * @param ctx - */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); http://git-wip-us.apache.org/repos/asf/ignite/blob/d32fa21b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java index 0801691..ce64e1d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLoadingConcurrentGridStartSelfTest.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -77,6 +78,8 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(PARTITIONED); @@ -95,7 +98,7 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT else cfg.setCacheConfiguration(ccfg); - if (!configured) + if (!configured) { ccfg.setNodeFilter(new P1<ClusterNode>() { @Override public boolean apply(ClusterNode node) { String name = node.attribute(ATTR_GRID_NAME).toString(); @@ -103,6 +106,7 @@ public class CacheLoadingConcurrentGridStartSelfTest extends GridCommonAbstractT return !getTestGridName(0).equals(name); } }); + } return cfg; }
