Repository: ignite Updated Branches: refs/heads/ignite-1843 77a3f64f1 -> 1d3108603
http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java new file mode 100644 index 0000000..d1299dd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -0,0 +1,1093 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import javax.cache.expiry.ExpiryPolicy; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; +import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; + +/** + * DHT atomic cache near update future. + */ +public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object> + implements GridCacheAtomicFuture<Object>{ + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + protected static IgniteLogger log; + + /** Cache context. */ + private final GridCacheContext cctx; + + /** Cache. */ + private GridDhtAtomicCache cache; + + /** Update operation. */ + private final GridCacheOperation op; + + /** Keys */ + private Object key; + + /** Values. */ + @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) + private Object val; + + /** Optional arguments for entry processor. */ + private Object[] invokeArgs; + + /** Return value require flag. */ + private final boolean retval; + + /** Expiry policy. */ + private final ExpiryPolicy expiryPlc; + + /** Optional filter. */ + private final CacheEntryPredicate[] filter; + + /** Write synchronization mode. */ + private final CacheWriteSynchronizationMode syncMode; + + /** Raw return value flag. */ + private final boolean rawRetval; + + /** Fast map flag. */ + private final boolean fastMap; + + /** Near cache flag. */ + private final boolean nearEnabled; + + /** Subject ID. */ + private final UUID subjId; + + /** Task name hash. */ + private final int taskNameHash; + + /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ + private boolean topLocked; + + /** Skip store flag. */ + private final boolean skipStore; + + /** Wait for topology future flag. */ + private final boolean waitTopFut; + + /** Remap count. */ + private int remapCnt; + + /** State. */ + private final UpdateState state; + + /** + * @param cctx Cache context. + * @param cache Cache instance. + * @param syncMode Write synchronization mode. + * @param op Update operation. + * @param key Key to update. + * @param val Value or transform closure. + * @param invokeArgs Optional arguments for entry processor. + * @param retval Return value require flag. + * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. + * @param expiryPlc Expiry policy explicitly specified for cache operation. + * @param filter Entry filter. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param skipStore Skip store flag. + * @param remapCnt Maximum number of retries. + * @param waitTopFut If {@code false} does not wait for affinity change future. + */ + public GridNearAtomicSingleUpdateFuture( + GridCacheContext cctx, + GridDhtAtomicCache cache, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + Object key, + Object val, + @Nullable Object[] invokeArgs, + final boolean retval, + final boolean rawRetval, + @Nullable ExpiryPolicy expiryPlc, + final CacheEntryPredicate[] filter, + UUID subjId, + int taskNameHash, + boolean skipStore, + int remapCnt, + boolean waitTopFut + ) { + this.rawRetval = rawRetval; + + assert subjId != null; + + this.cctx = cctx; + this.cache = cache; + this.syncMode = syncMode; + this.op = op; + this.key = key; + this.val = val; + this.invokeArgs = invokeArgs; + this.retval = retval; + this.expiryPlc = expiryPlc; + this.filter = filter; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.skipStore = skipStore; + this.waitTopFut = waitTopFut; + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); + + fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && + cctx.config().getAtomicWriteOrderMode() == CLOCK && + !(cctx.writeThrough() && cctx.config().getInterceptor() != null); + + nearEnabled = CU.isNearEnabled(cctx); + + if (!waitTopFut) + remapCnt = 1; + + this.remapCnt = remapCnt; + + state = new UpdateState(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public GridCacheVersion version() { + return state.futureVersion(); + } + + /** {@inheritDoc} */ + @Override public Collection<? extends ClusterNode> nodes() { + throw new UnsupportedOperationException(); + } + + /** + * @return {@code True} if this future should block partition map exchange. + */ + private boolean waitForPartitionExchange() { + // Wait fast-map near atomic update futures in CLOCK mode. + return fastMap; + } + + /** {@inheritDoc} */ + @Override public Collection<?> keys() { + return Collections.singleton(key); + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + state.onNodeLeft(nodeId); + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return true; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + @Override + public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse res) { + assert false; + + throw new UnsupportedOperationException(); + } + + @Override + public void onResult(UUID nodeId) { + assert false; + + throw new UnsupportedOperationException(); + } + + /** + * Performs future mapping. + */ + public void map() { + AffinityTopologyVersion topVer = null; + + IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null); + + if (tx != null && tx.topologyVersionSnapshot() != null) + topVer = tx.topologyVersionSnapshot(); + + if (topVer == null) + topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + + if (topVer == null) + mapOnTopology(); + else { + topLocked = true; + + // Cannot remap. + remapCnt = 1; + + state.map(topVer); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { + if (waitForPartitionExchange()) { + GridFutureAdapter<Void> fut = state.completeFuture(topVer); + + if (fut != null && isDone()) { + fut.onDone(); + + return null; + } + + return fut; + } + + return null; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + assert res == null || res instanceof GridCacheReturn; + + GridCacheReturn ret = (GridCacheReturn)res; + + Object retval = + res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success(); + + if (op == TRANSFORM && retval == null) + retval = Collections.emptyMap(); + + if (super.onDone(retval, err)) { + GridCacheVersion futVer = state.onFutureDone(); + + if (futVer != null) + cctx.mvcc().removeAtomicFuture(futVer); + + return true; + } + + return false; + } + + /** + * Response callback. + * + * @param nodeId Node ID. + * @param res Update response. + */ + public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { + state.onResult(nodeId, res, false); + } + + /** + * Updates near cache. + * + * @param req Update request. + * @param res Update response. + */ + private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + if (!nearEnabled || !req.hasPrimary()) + return; + + GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); + + near.processNearAtomicUpdateResponse(req, res); + } + + /** + * Maps future on ready topology. + */ + private void mapOnTopology() { + cache.topology().readLock(); + + AffinityTopologyVersion topVer = null; + + try { + if (cache.topology().stopping()) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cache.name())); + + return; + } + + GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); + + if (fut.isDone()) { + Throwable err = fut.validateCache(cctx); + + if (err != null) { + onDone(err); + + return; + } + + topVer = fut.topologyVersion(); + } + else { + if (waitTopFut) { + assert !topLocked : this; + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + mapOnTopology(); + } + }); + } + }); + } + else + onDone(new GridCacheTryPutFailedException()); + + return; + } + } + finally { + cache.topology().readUnlock(); + } + + state.map(topVer); + } + + /** + * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. + */ + private boolean storeFuture() { + return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; + } + + /** + * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near + * node and send updates in parallel to all participating nodes. + * + * @param key Key to map. + * @param topVer Topology version to map. + * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. + * @return Collection of nodes to which key is mapped. + */ + private Collection<ClusterNode> mapKey( + KeyCacheObject key, + AffinityTopologyVersion topVer, + boolean fastMap + ) { + GridCacheAffinityManager affMgr = cctx.affinity(); + + // If we can send updates in parallel - do it. + return fastMap ? + cctx.topology().nodes(affMgr.partition(key), topVer) : + Collections.singletonList(affMgr.primary(key, topVer)); + } + + /** + * Maps future to single node. + * + * @param nodeId Node ID. + * @param req Request. + */ + private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + if (cctx.localNodeId().equals(nodeId)) { + cache.updateAllAsyncInternal(nodeId, req, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); + } + }); + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, null, true)); + } + catch (IgniteCheckedException e) { + state.onSendError(req, e); + } + } + } + + /** + * Sends messages to remote nodes and updates local cache. + * + * @param mappings Mappings to send. + */ + private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { + UUID locNodeId = cctx.localNodeId(); + + GridNearAtomicUpdateRequest locUpdate = null; + + // Send messages to remote nodes first, then run local update. + for (GridNearAtomicUpdateRequest req : mappings.values()) { + if (locNodeId.equals(req.nodeId())) { + assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + + ", req=" + req + ']'; + + locUpdate = req; + } + else { + try { + if (log.isDebugEnabled()) + log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + state.onSendError(req, e); + } + } + } + + if (locUpdate != null) { + cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res); + } + }); + } + + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, null, true)); + } + + /** + * + */ + private class UpdateState { + /** Current topology version. */ + private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; + + /** */ + private GridCacheVersion updVer; + + /** Topology version when got mapping error. */ + private AffinityTopologyVersion mapErrTopVer; + + /** Mappings if operations is mapped to more than one node. */ + @GridToStringInclude + private Map<UUID, GridNearAtomicUpdateRequest> mappings; + + /** Error. */ + private CachePartialUpdateCheckedException err; + + /** Future ID. */ + private GridCacheVersion futVer; + + /** Completion future for a particular topology version. */ + private GridFutureAdapter<Void> topCompleteFut; + + /** Key to remap. */ + private KeyCacheObject remapKey; + + /** Not null is operation is mapped to single node. */ + private GridNearAtomicUpdateRequest singleReq; + + /** Operation result. */ + private GridCacheReturn opRes; + + /** + * @return Future version. + */ + @Nullable synchronized GridCacheVersion futureVersion() { + return futVer; + } + + /** + * @param nodeId Left node ID. + */ + void onNodeLeft(UUID nodeId) { + GridNearAtomicUpdateResponse res = null; + + synchronized (this) { + GridNearAtomicUpdateRequest req; + + if (singleReq != null) + req = singleReq.nodeId().equals(nodeId) ? singleReq : null; + else + req = mappings != null ? mappings.get(nodeId) : null; + + if (req != null) { + res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(), + cctx.deploymentEnabled()); + + ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + + "before response is received: " + nodeId); + + e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); + + res.addFailedKeys(req.keys(), e); + } + } + + if (res != null) + onResult(nodeId, res, true); + } + + /** + * @param nodeId Node ID. + * @param res Response. + * @param nodeErr {@code True} if response was created on node failure. + */ + void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + GridNearAtomicUpdateRequest req; + + AffinityTopologyVersion remapTopVer = null; + + GridCacheReturn opRes0 = null; + CachePartialUpdateCheckedException err0 = null; + + boolean rcvAll; + + GridFutureAdapter<?> fut0 = null; + + synchronized (this) { + if (!res.futureVersion().equals(futVer)) + return; + + if (singleReq != null) { + if (!singleReq.nodeId().equals(nodeId)) + return; + + req = singleReq; + + singleReq = null; + + rcvAll = true; + } + else { + req = mappings != null ? mappings.remove(nodeId) : null; + + if (req != null) + rcvAll = mappings.isEmpty(); + else + return; + } + + assert req != null && req.topologyVersion().equals(topVer) : req; + + if (res.remapKeys() != null) { + assert !fastMap || cctx.kernalContext().clientNode(); + + remapKey = res.remapKeys().iterator().next(); + + if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) + mapErrTopVer = req.topologyVersion(); + } + else if (res.error() != null) { + if (res.failedKeys() != null) + addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); + } + else { + if (!req.fastMap() || req.hasPrimary()) { + GridCacheReturn ret = res.returnValue(); + + if (op == TRANSFORM) { + if (ret != null) + addInvokeResults(ret); + } + else + opRes = ret; + } + } + + if (rcvAll) { + if (remapKey != null) { + assert mapErrTopVer != null; + + remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1); + } + else { + if (err != null && + X.hasCause(err, CachePartialUpdateCheckedException.class) && + X.hasCause(err, ClusterTopologyCheckedException.class) && + storeFuture() && + --remapCnt > 0) { + ClusterTopologyCheckedException topErr = + X.cause(err, ClusterTopologyCheckedException.class); + + if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { + CachePartialUpdateCheckedException cause = + X.cause(err, CachePartialUpdateCheckedException.class); + + assert cause != null && cause.topologyVersion() != null : err; + + remapTopVer = + new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); + + err = null; + + Object failedKey = cause.failedKeys().iterator().next(); + + remapKey = cctx.toCacheKeyObject(failedKey); + + updVer = null; + } + } + } + + if (remapTopVer == null) { + err0 = err; + opRes0 = opRes; + } + else { + fut0 = topCompleteFut; + + topCompleteFut = null; + + cctx.mvcc().removeAtomicFuture(futVer); + + futVer = null; + topVer = AffinityTopologyVersion.ZERO; + } + } + } + + if (res.error() != null && res.failedKeys() == null) { + onDone(res.error()); + + return; + } + + if (!nodeErr && res.remapKeys() == null) + updateNear(req, res); + + if (remapTopVer != null) { + if (fut0 != null) + fut0.onDone(); + + if (!waitTopFut) { + onDone(new GridCacheTryPutFailedException()); + + return; + } + + if (topLocked) { + assert remapKey != null; + + CachePartialUpdateCheckedException e = + new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( + "Failed to update keys, topology changed while execute atomic update inside transaction."); + + cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); + + e.add(Collections.singleton(remapKey), cause); + + onDone(e); + + return; + } + + IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer); + + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + try { + AffinityTopologyVersion topVer = fut.get(); + + map(topVer); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + }); + + return; + } + + if (rcvAll) + onDone(opRes0, err0); + } + + /** + * @param req Request. + * @param e Error. + */ + void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + synchronized (this) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureVersion(), + cctx.deploymentEnabled()); + + res.addFailedKeys(req.keys(), e); + + onResult(req.nodeId(), res, true); + } + } + + /** + * @param topVer Topology version. + */ + void map(AffinityTopologyVersion topVer) { + Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); + + if (F.isEmpty(topNodes)) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid).")); + + return; + } + + Exception err = null; + GridNearAtomicUpdateRequest singleReq0 = null; + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; + + synchronized (this) { + assert futVer == null : this; + assert this.topVer == AffinityTopologyVersion.ZERO : this; + + this.topVer = topVer; + + futVer = cctx.versions().next(topVer); + + if (storeFuture()) { + if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) { + assert isDone() : GridNearAtomicSingleUpdateFuture.this; + + return; + } + } + + // Assign version on near node in CLOCK ordering mode even if fastMap is false. + if (updVer == null) + updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; + + if (updVer != null && log.isDebugEnabled()) + log.debug("Assigned fast-map version for update on near node: " + updVer); + + try { + if (fastMap) { + pendingMappings = mapUpdate(topNodes); + + if (pendingMappings.size() == 1) + singleReq0 = singleReq = F.firstValue(pendingMappings); + else { + if (syncMode == PRIMARY_SYNC) { + mappings = U.newHashMap(pendingMappings.size()); + + for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { + if (req.hasPrimary()) + mappings.put(req.nodeId(), req); + } + } + else + mappings = new HashMap<>(pendingMappings); + + assert !mappings.isEmpty() : GridNearAtomicSingleUpdateFuture.this; + } + } + else + singleReq0 = singleReq = mapSingleUpdate(); + + remapKey = null; + } + catch (Exception e) { + err = e; + } + } + + if (err != null) { + onDone(err); + + return; + } + + // Optimize mapping for single key. + if (singleReq0 != null) + mapSingle(singleReq0.nodeId(), singleReq0); + else { + assert pendingMappings != null; + + doUpdate(pendingMappings); + } + } + + /** + * @param topVer Topology version. + * @return Future. + */ + @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { + if (this.topVer == AffinityTopologyVersion.ZERO) + return null; + + if (this.topVer.compareTo(topVer) < 0) { + if (topCompleteFut == null) + topCompleteFut = new GridFutureAdapter<>(); + + return topCompleteFut; + } + + return null; + } + + /** + * @return Future version. + */ + GridCacheVersion onFutureDone() { + GridCacheVersion ver0; + + GridFutureAdapter<Void> fut0; + + synchronized (this) { + fut0 = topCompleteFut; + + topCompleteFut = null; + + ver0 = futVer; + + futVer = null; + } + + if (fut0 != null) + fut0.onDone(); + + return ver0; + } + + /** + * @param topNodes Cache nodes. + * @return Mapping. + * @throws Exception If failed. + */ + private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { + assert fastMap; + + Object key = GridNearAtomicSingleUpdateFuture.this.key; + Object val = GridNearAtomicSingleUpdateFuture.this.val; + + if (val == null && op != GridCacheOperation.DELETE) + return Collections.emptyMap(); + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + + if (op != TRANSFORM) + val = cctx.toCacheObject(val); + + Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + + if (affNodes.isEmpty()) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); + + Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(affNodes.size()); + + int i = 0; + + for (ClusterNode affNode : affNodes) { + if (affNode == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid)."); + + UUID nodeId = affNode.id(); + + GridNearAtomicUpdateRequest mapped = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + true); + + pendingMappings.put(nodeId, mapped); + + mapped.addSingleUpdate(cacheKey, val, i == 0); + + i++; + } + + return pendingMappings; + } + + /** + * @return Request. + * @throws Exception If failed. + */ + private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { + Object key = GridNearAtomicSingleUpdateFuture.this.key; + Object val = GridNearAtomicSingleUpdateFuture.this.val; + + // We still can get here if user pass map with single element. + if (key == null) + throw new NullPointerException("Null key."); + + if (val == null && op != GridCacheOperation.DELETE) + throw new NullPointerException("Null value."); + + KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + + if (op != TRANSFORM) + val = cctx.toCacheObject(val); + + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); + + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid)."); + + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( + cctx.cacheId(), + primary.id(), + futVer, + fastMap, + updVer, + topVer, + topLocked, + syncMode, + op, + retval, + expiryPlc, + invokeArgs, + filter, + subjId, + taskNameHash, + skipStore, + cctx.kernalContext().clientNode(), + cctx.deploymentEnabled(), + true); + + req.addSingleUpdate(cacheKey, + val, + true); + + return req; + } + + /** + * @param ret Result from single node. + */ + @SuppressWarnings("unchecked") + private void addInvokeResults(GridCacheReturn ret) { + assert op == TRANSFORM : op; + assert ret.value() == null || ret.value() instanceof Map : ret.value(); + + if (ret.value() != null) { + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else + opRes = ret; + } + } + + /** + * @param failedKeys Failed keys. + * @param topVer Topology version for failed update. + * @param err Error cause. + */ + private void addFailedKeys(Collection<KeyCacheObject> failedKeys, + AffinityTopologyVersion topVer, + Throwable err) { + CachePartialUpdateCheckedException err0 = this.err; + + if (err0 == null) + err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); + + Collection<Object> keys = new ArrayList<>(failedKeys.size()); + + for (KeyCacheObject key : failedKeys) + keys.add(key.value(cctx.cacheObjectContext(), false)); + + err0.add(keys, err, topVer); + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return S.toString(UpdateState.class, this); + } + } + + /** {@inheritDoc} */ + public String toString() { + return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ae662c8..04557a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -526,6 +526,20 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> onDone(new GridCacheReturn(cctx, true, null, true)); } + @Override + public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse res) { + assert false; + + throw new UnsupportedOperationException(); + } + + @Override + public void onResult(UUID nodeId) { + assert false; + + throw new UnsupportedOperationException(); + } + /** * */ @@ -1051,7 +1065,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> taskNameHash, skipStore, cctx.kernalContext().clientNode(), - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + false); pendingMappings.put(nodeId, mapped); } @@ -1144,7 +1159,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> taskNameHash, skipStore, cctx.kernalContext().clientNode(), - cctx.deploymentEnabled()); + cctx.deploymentEnabled(), + false); req.addUpdateEntry(cacheKey, val, http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index 33fa4bd..784911d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -89,6 +89,16 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** Update operation. */ private GridCacheOperation op; + /** */ + private KeyCacheObject singleKey; + + /** */ + private CacheObject singleVal; + + /** */ + @GridDirectTransient + private EntryProcessor singleEntryProcessor; + /** Keys to update. */ @GridToStringInclude @GridDirectCollection(KeyCacheObject.class) @@ -106,6 +116,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri @GridDirectCollection(byte[].class) private List<byte[]> entryProcessorsBytes; + private byte[] singleEntryProcessorsBytes; + /** Optional arguments for entry processor. */ @GridDirectTransient private Object[] invokeArgs; @@ -198,7 +210,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri int taskNameHash, boolean skipStore, boolean clientReq, - boolean addDepInfo + boolean addDepInfo, + boolean single ) { assert futVer != null; @@ -222,7 +235,12 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri this.clientReq = clientReq; this.addDepInfo = addDepInfo; - keys = new ArrayList<>(); + if (!single) + keys = new ArrayList<>(); + } + + public boolean singleUpdate() { + return singleKey != null; } /** {@inheritDoc} */ @@ -335,6 +353,22 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return skipStore; } + public void addSingleUpdate(KeyCacheObject key, @Nullable Object val, boolean primary) { + assert val != null || op == DELETE; + + if (op == TRANSFORM) { + assert val instanceof EntryProcessor : val; + + singleEntryProcessor = (EntryProcessor<Object, Object, Object>)val; + } + else + singleVal = (CacheObject)val; + + singleKey = key; + + hasPrimary = primary; + } + /** * @param key Key to add. * @param val Optional update value. @@ -415,6 +449,20 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri } } + public KeyCacheObject singleKey() { + assert singleKey != null; + + return singleKey; + } + + public CacheObject singleWriteValue() { + return singleVal; + } + + public EntryProcessor singleEntryProcessor() { + return singleEntryProcessor; + } + /** * @return Keys for this update request. */ @@ -539,8 +587,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri GridCacheContext cctx = ctx.cacheContext(cacheId); - prepareMarshalCacheObjects(keys, cctx); - if (filter != null) { boolean hasFilter = false; @@ -559,17 +605,40 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri if (expiryPlc != null) expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); - if (op == TRANSFORM) { - // force addition of deployment info for entry processors if P2P is enabled globally. - if (!addDepInfo && ctx.deploymentEnabled()) - addDepInfo = true; + if (singleKey != null) { + prepareMarshalCacheObject(singleKey, cctx); - entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + if (op == TRANSFORM) { + // TODO + assert false; - invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + // force addition of deployment info for entry processors if P2P is enabled globally. + if (!addDepInfo && ctx.deploymentEnabled()) + addDepInfo = true; + + // TODO + // entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + + invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + } + else + prepareMarshalCacheObject(singleVal, cctx); + } + else { + prepareMarshalCacheObjects(keys, cctx); + + if (op == TRANSFORM) { + // force addition of deployment info for entry processors if P2P is enabled globally. + if (!addDepInfo && ctx.deploymentEnabled()) + addDepInfo = true; + + entryProcessorsBytes = marshalCollection(entryProcessors, cctx); + + invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); + } + else + prepareMarshalCacheObjects(vals, cctx); } - else - prepareMarshalCacheObjects(vals, cctx); } /** {@inheritDoc} */ @@ -578,12 +647,24 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri GridCacheContext cctx = ctx.cacheContext(cacheId); - finishUnmarshalCacheObjects(keys, cctx, ldr); + if (singleKey != null) { + finishUnmarshalCacheObject(singleKey, cctx, ldr); - if (op == TRANSFORM) - entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); - else - finishUnmarshalCacheObjects(vals, cctx, ldr); + if (op == TRANSFORM) { + // TODO + assert false; + } + else + finishUnmarshalCacheObject(singleVal, cctx, ldr); + } + else { + finishUnmarshalCacheObjects(keys, cctx, ldr); + + if (op == TRANSFORM) + entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); + else + finishUnmarshalCacheObjects(vals, cctx, ldr); + } if (filter != null) { for (CacheEntryPredicate p : filter) { @@ -703,48 +784,66 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri writer.incrementState(); case 17: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeByteArray("singleEntryProcessorsBytes", singleEntryProcessorsBytes)) return false; writer.incrementState(); case 18: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeMessage("singleKey", singleKey)) return false; writer.incrementState(); case 19: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeMessage("singleVal", singleVal)) return false; writer.incrementState(); case 20: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 21: - if (!writer.writeBoolean("topLocked", topLocked)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 22: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 23: - if (!writer.writeMessage("updateVer", updateVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 24: + if (!writer.writeBoolean("topLocked", topLocked)) + return false; + + writer.incrementState(); + + case 25: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 26: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + case 27: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; @@ -883,7 +982,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 17: - skipStore = reader.readBoolean("skipStore"); + singleEntryProcessorsBytes = reader.readByteArray("singleEntryProcessorsBytes"); if (!reader.isLastRead()) return false; @@ -891,7 +990,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 18: - subjId = reader.readUuid("subjId"); + singleKey = reader.readMessage("singleKey"); if (!reader.isLastRead()) return false; @@ -899,6 +998,30 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); case 19: + singleVal = reader.readMessage("singleVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: + skipStore = reader.readBoolean("skipStore"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 21: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 22: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -910,7 +1033,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 20: + case 23: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -918,7 +1041,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 21: + case 24: topLocked = reader.readBoolean("topLocked"); if (!reader.isLastRead()) @@ -926,7 +1049,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 22: + case 25: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -934,7 +1057,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 23: + case 26: updateVer = reader.readMessage("updateVer"); if (!reader.isLastRead()) @@ -942,7 +1065,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri reader.incrementState(); - case 24: + case 27: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -962,7 +1085,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 25; + return 28; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1d310860/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 1bf03a9..f6daa3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -150,14 +150,14 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); - for (int i = 0; i < req.keys().size(); i++) { - if (F.contains(skipped, i)) - continue; + if (req.singleUpdate()) { + if (F.contains(skipped, 0)) + return; - KeyCacheObject key = req.keys().get(i); + KeyCacheObject key = req.singleKey(); if (F.contains(failed, key)) - continue; + return; if (ctx.affinity().belongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup. GridCacheEntryEx entry = peekEx(key); @@ -165,25 +165,22 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { if (entry != null && entry.markObsolete(ver)) removeEntry(entry); - continue; + return; } CacheObject val = null; - if (F.contains(nearValsIdxs, i)) { + if (F.contains(nearValsIdxs, 0)) val = res.nearValue(nearValIdx); - - nearValIdx++; - } else { assert req.operation() != TRANSFORM; if (req.operation() != DELETE) - val = req.value(i); + val = req.singleWriteValue(); } - long ttl = res.nearTtl(i); - long expireTime = res.nearExpireTime(i); + long ttl = res.nearTtl(0); + long expireTime = res.nearExpireTime(0); if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE) expireTime = CU.toExpireTime(ttl); @@ -203,6 +200,61 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e)); } } + else { + for (int i = 0; i < req.keys().size(); i++) { + if (F.contains(skipped, i)) + continue; + + KeyCacheObject key = req.keys().get(i); + + if (F.contains(failed, key)) + continue; + + if (ctx.affinity().belongs(ctx.localNode(), ctx.affinity().partition(key), req.topologyVersion())) { // Reader became backup. + GridCacheEntryEx entry = peekEx(key); + + if (entry != null && entry.markObsolete(ver)) + removeEntry(entry); + + continue; + } + + CacheObject val = null; + + if (F.contains(nearValsIdxs, i)) { + val = res.nearValue(nearValIdx); + + nearValIdx++; + } + else { + assert req.operation() != TRANSFORM; + + if (req.operation() != DELETE) + val = req.value(i); + } + + long ttl = res.nearTtl(i); + long expireTime = res.nearExpireTime(i); + + if (ttl != CU.TTL_NOT_CHANGED && expireTime == CU.EXPIRE_TIME_CALCULATE) + expireTime = CU.toExpireTime(ttl); + + try { + processNearAtomicUpdateResponse(ver, + key, + val, + null, + ttl, + expireTime, + req.nodeId(), + req.subjectId(), + taskName); + } + catch (IgniteCheckedException e) { + res.addFailedKey(key, new IgniteCheckedException("Failed to update key in near cache: " + key, e)); + } + } + } } /**
