IGNITE-1525 Return value for cache operation can be lost with onePhaseCommit
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b72d18d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b72d18d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b72d18d Branch: refs/heads/ignite-comm-balance Commit: 9b72d18dd94ec1383653f00474c102804c02790a Parents: c3eff6b Author: Anton Vinogradov <[email protected]> Authored: Mon Sep 19 18:07:20 2016 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Sep 19 18:07:20 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 12 + .../communication/GridIoMessageFactory.java | 6 + .../GridCacheReturnCompletableWrapper.java | 101 +++++++++ .../cache/GridDeferredAckMessageSender.java | 219 ++++++++++++++++++ .../GridDistributedTxRemoteAdapter.java | 59 +++-- .../distributed/dht/GridDhtTxFinishFuture.java | 12 +- .../distributed/dht/GridDhtTxFinishRequest.java | 33 ++- .../dht/GridDhtTxFinishResponse.java | 52 ++++- .../dht/GridDhtTxOnePhaseCommitAckRequest.java | 134 +++++++++++ .../distributed/dht/GridDhtTxPrepareFuture.java | 6 +- .../dht/GridDhtTxPrepareRequest.java | 93 +++++--- .../cache/distributed/dht/GridDhtTxRemote.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 227 +++++-------------- ...arOptimisticSerializableTxPrepareFuture.java | 4 +- .../near/GridNearOptimisticTxPrepareFuture.java | 7 +- .../GridNearPessimisticTxPrepareFuture.java | 4 +- .../near/GridNearTxFinishFuture.java | 112 +++++++-- .../cache/transactions/IgniteTxAdapter.java | 46 +++- .../cache/transactions/IgniteTxHandler.java | 163 ++++++++++--- .../transactions/IgniteTxLocalAdapter.java | 19 +- .../cache/transactions/IgniteTxManager.java | 154 ++++++++++++- .../IgniteCachePutRetryAbstractSelfTest.java | 39 +++- ...gniteCachePutRetryTransactionalSelfTest.java | 75 +++++- .../config/benchmark-client-mode.properties | 2 + .../config/benchmark-tx-win.properties | 2 + .../yardstick/config/benchmark-tx.properties | 2 + .../yardstick/config/benchmark-win.properties | 2 + modules/yardstick/config/benchmark.properties | 2 + .../cache/IgniteGetAndPutBenchmark.java | 41 ++++ .../cache/IgniteGetAndPutTxBenchmark.java | 70 ++++++ .../cache/IgniteInvokeTxBenchmark.java | 40 ++++ 31 files changed, 1405 insertions(+), 339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 7c428a6..ab6403f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -290,6 +290,18 @@ public final class IgniteSystemProperties { public static final String IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT = "IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT"; /** + * One phase commit deferred ack request timeout. + */ + public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT = + "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT"; + + /** + * One phase commit deferred ack request buffer size. + */ + public static final String IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE = + "IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE"; + + /** * If this property set then debug console will be opened for H2 indexing SPI. */ public static final String IGNITE_H2_DEBUG_CONSOLE = "IGNITE_H2_DEBUG_CONSOLE"; http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 5f60215..8b8a734 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest; @@ -160,6 +161,11 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -27: + msg = new GridDhtTxOnePhaseCommitAckRequest(); + + break; + case -26: msg = new TxLockList(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java new file mode 100644 index 0000000..8ceaf71 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturnCompletableWrapper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jetbrains.annotations.Nullable; + +/** + * Provides initialized GridCacheReturn. + */ +public class GridCacheReturnCompletableWrapper { + /** Completable wrapper upd. */ + private static final AtomicReferenceFieldUpdater<GridCacheReturnCompletableWrapper, Object> COMPLETABLE_WRAPPER_UPD = + AtomicReferenceFieldUpdater.newUpdater(GridCacheReturnCompletableWrapper.class, Object.class, "o"); + + /** */ + private volatile Object o; + + /** Node id. */ + private final UUID nodeId; + + /** + * @param nodeId Node id. + */ + public GridCacheReturnCompletableWrapper(UUID nodeId) { + this.nodeId = nodeId; + } + + /** + * @return ID of node initiated tx or {@code null} if this node is local. + */ + @Nullable public UUID nodeId() { + return nodeId; + } + + /** + * Marks as initialized. + * + * @param ret Return. + */ + public void initialize(GridCacheReturn ret) { + final Object obj = this.o; + + if (obj == null) { + boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, ret); + + if (!res) + initialize(ret); + } + else if (obj instanceof GridFutureAdapter) { + ((GridFutureAdapter)obj).onDone(ret); + + boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, obj, ret); + + assert res; + } + else + throw new IllegalStateException("GridCacheReturnCompletableWrapper can't be reinitialized"); + } + + /** + * Allows wait for properly initialized value. + */ + public IgniteInternalFuture<GridCacheReturn> fut() { + final Object obj = this.o; + + if (obj instanceof GridCacheReturn) + return new GridFinishedFuture<>((GridCacheReturn)obj); + else if (obj instanceof IgniteInternalFuture) + return (IgniteInternalFuture)obj; + else if (obj == null) { + boolean res = COMPLETABLE_WRAPPER_UPD.compareAndSet(this, null, new GridFutureAdapter<>()); + + if (res) + return (IgniteInternalFuture)this.o; + else + return fut(); + } + else + throw new IllegalStateException(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java new file mode 100644 index 0000000..7145dc2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.closure.GridClosureProcessor; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.jsr166.ConcurrentHashMap8; +import org.jsr166.ConcurrentLinkedDeque8; + +/** + * + */ +public abstract class GridDeferredAckMessageSender { + /** Deferred message buffers. */ + private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>(); + + /** Timeout processor. */ + private GridTimeoutProcessor time; + + /** Closure processor. */ + public GridClosureProcessor closure; + + /** + * @param time Time. + * @param closure Closure. + */ + public GridDeferredAckMessageSender(GridTimeoutProcessor time, + GridClosureProcessor closure) { + this.time = time; + this.closure = closure; + } + + /** + * + */ + public abstract int getTimeout(); + + /** + * + */ + public abstract int getBufferSize(); + + /** + * + */ + public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers); + + /** + * + */ + public void stop() { + for (DeferredAckMessageBuffer buf : deferredAckMsgBuffers.values()) + buf.finish0(); + } + + /** + * @param nodeId Node ID to send message to. + * @param ver Version to ack. + */ + public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) { + while (true) { + DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId); + + if (buf == null) { + buf = new DeferredAckMessageBuffer(nodeId); + + DeferredAckMessageBuffer old = deferredAckMsgBuffers.putIfAbsent(nodeId, buf); + + if (old == null) { + // We have successfully added buffer to map. + time.addTimeoutObject(buf); + } + else + buf = old; + } + + if (!buf.add(ver)) + // Some thread is sending filled up buffer, we can remove it. + deferredAckMsgBuffers.remove(nodeId, buf); + else + break; + } + } + + /** + * Deferred message buffer. + */ + private class DeferredAckMessageBuffer extends ReentrantReadWriteLock implements GridTimeoutObject { + /** */ + private static final long serialVersionUID = 0L; + + /** Filled atomic flag. */ + private AtomicBoolean guard = new AtomicBoolean(false); + + /** Versions. */ + private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>(); + + /** Node ID. */ + private final UUID nodeId; + + /** Timeout ID. */ + private final IgniteUuid timeoutId; + + /** End time. */ + private final long endTime; + + /** + * @param nodeId Node ID to send message to. + */ + private DeferredAckMessageBuffer(UUID nodeId) { + this.nodeId = nodeId; + + timeoutId = IgniteUuid.fromUuid(nodeId); + + endTime = U.currentTimeMillis() + getTimeout(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return timeoutId; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (guard.compareAndSet(false, true)) { + closure.runLocalSafe(new Runnable() { + @Override public void run() { + writeLock().lock(); + + try { + finish0(); + } + finally { + writeLock().unlock(); + } + } + }); + } + } + + /** + * Adds deferred request to buffer. + * + * @param ver Version to send. + * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used. + */ + public boolean add(GridCacheVersion ver) { + readLock().lock(); + + boolean snd = false; + + try { + if (guard.get()) + return false; + + vers.add(ver); + + if (vers.sizex() > getBufferSize() && guard.compareAndSet(false, true)) + snd = true; + } + finally { + readLock().unlock(); + } + + if (snd) { + // Wait all threads in read lock to finish. + writeLock().lock(); + + try { + finish0(); + + time.removeTimeoutObject(this); + } + finally { + writeLock().unlock(); + } + } + + return true; + } + + /** + * Sends deferred notification message and removes this buffer from pending responses map. + */ + private void finish0() { + finish(nodeId, vers); + + deferredAckMsgBuffers.remove(nodeId, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 9d9862a..4adfa8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -36,6 +36,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; @@ -448,7 +450,25 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); + GridCacheReturnCompletableWrapper wrapper = null; + if (!F.isEmpty(writeMap)) { + GridCacheReturn ret = null; + + if (!near() && !local() && onePhaseCommit()) { + if (needReturnValue()) { + ret = new GridCacheReturn(null, cctx.localNodeId().equals(otherNodeId()), true, null, true); + + UUID origNodeId = otherNodeId(); // Originating node. + + cctx.tm().addCommittedTxReturn(this, + wrapper = new GridCacheReturnCompletableWrapper( + !cctx.localNodeId().equals(origNodeId) ? origNodeId : null)); + } + else + cctx.tm().addCommittedTx(this, this.nearXidVersion(), null); + } + // Register this transaction as completed prior to write-phase to // ensure proper lock ordering for removed entries. cctx.tm().addCommittedTx(this); @@ -457,13 +477,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter batchStoreCommit(writeMap().values()); - // Node that for near transactions we grab all entries. - for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) { - GridCacheContext cacheCtx = txEntry.context(); + try { + // Node that for near transactions we grab all entries. + for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) { + GridCacheContext cacheCtx = txEntry.context(); - boolean replicate = cacheCtx.isDrEnabled(); + boolean replicate = cacheCtx.isDrEnabled(); - try { while (true) { try { GridCacheEntryEx cached = txEntry.cached(); @@ -486,7 +506,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter txEntry.cached().unswap(false); IgniteBiTuple<GridCacheOperation, CacheObject> res = - applyTransformClosures(txEntry, false); + applyTransformClosures(txEntry, false, ret); GridCacheOperation op = res.get1(); CacheObject val = res.get2(); @@ -672,21 +692,26 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } } - catch (Throwable ex) { - // In case of error, we still make the best effort to commit, - // as there is no way to rollback at this point. - err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " + - "(all transaction entries will be invalidated): " + CU.txString(this), ex); + } + catch (Throwable ex) { + // In case of error, we still make the best effort to commit, + // as there is no way to rollback at this point. + err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " + + "(all transaction entries will be invalidated): " + CU.txString(this), ex); - U.error(log, "Commit failed.", err); + U.error(log, "Commit failed.", err); - uncommit(); + uncommit(); - state(UNKNOWN); + state(UNKNOWN); - if (ex instanceof Error) - throw (Error)ex; - } + if (ex instanceof Error) + throw (Error)ex; + + } + finally { + if (wrapper != null) + wrapper.initialize(ret); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index d2e26b4..ac2ab41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -351,7 +351,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.size(), tx.subjectId(), tx.taskNameHash(), - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + false, + false); try { cctx.io().send(n, req, tx.ioPolicy()); @@ -448,7 +450,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash(), tx.activeCachesDeploymentEnabled(), - updCntrs); + updCntrs, + false, + false); req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); @@ -516,7 +520,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.size(), tx.subjectId(), tx.taskNameHash(), - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + false, + false); req.writeVersion(tx.writeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 2d98e0d..c618a18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -46,6 +46,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** */ public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01; + /** */ + public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02; + /** Near node ID. */ private UUID nearNodeId; @@ -141,7 +144,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { int txSize, @Nullable UUID subjId, int taskNameHash, - boolean addDepInfo + boolean addDepInfo, + boolean retVal, + boolean waitRemoteTxs ) { super( xidVer, @@ -172,6 +177,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { this.sysInvalidate = sysInvalidate; this.subjId = subjId; this.taskNameHash = taskNameHash; + + needReturnValue(retVal); + waitRemoteTransactions(waitRemoteTxs); } /** @@ -224,11 +232,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { @Nullable UUID subjId, int taskNameHash, boolean addDepInfo, - Collection<Long> updateIdxs + Collection<Long> updateIdxs, + boolean retVal, + boolean waitRemoteTxs ) { this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc, sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize, - subjId, taskNameHash, addDepInfo); + subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs); if (updateIdxs != null && !updateIdxs.isEmpty()) { partUpdateCnt = new GridLongList(updateIdxs.size()); @@ -339,6 +349,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { flags &= ~WAIT_REMOTE_TX_FLAG_MASK; } + /** + * @return Flag indicating whether transaction needs return value. + */ + public boolean needReturnValue() { + return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0; + } + + /** + * @param retVal Need return value. + */ + public void needReturnValue(boolean retVal) { + if (retVal) + flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK); + else + flags &= ~NEED_RETURN_VALUE_FLAG_MASK; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index 78dc16f..0618172 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -19,9 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; import java.nio.ByteBuffer; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -51,6 +52,9 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** Flag indicating if this is a check-committed response. */ private boolean checkCommitted; + /** Cache return value. */ + private GridCacheReturn retVal; + /** * Empty constructor required by {@link Externalizable}. */ @@ -112,6 +116,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { if (checkCommittedErr != null && checkCommittedErrBytes == null) checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr); + + if (retVal != null && retVal.cacheId() != 0) { + GridCacheContext cctx = ctx.cacheContext(retVal.cacheId()); + + assert cctx != null : retVal.cacheId(); + + retVal.prepareMarshal(cctx); + } } /** {@inheritDoc} */ @@ -121,6 +133,28 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { if (checkCommittedErrBytes != null && checkCommittedErr == null) checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + if (retVal != null && retVal.cacheId() != 0) { + GridCacheContext cctx = ctx.cacheContext(retVal.cacheId()); + + assert cctx != null : retVal.cacheId(); + + retVal.finishUnmarshal(cctx, ldr); + } + } + + /** + * @param retVal Return value. + */ + public void returnValue(GridCacheReturn retVal) { + this.retVal = retVal; + } + + /** + * @return Return value. + */ + public GridCacheReturn returnValue() { + return retVal; } /** {@inheritDoc} */ @@ -161,6 +195,12 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { writer.incrementState(); + case 8: + if (!writer.writeMessage("retVal", retVal)) + return false; + + writer.incrementState(); + } return true; @@ -201,6 +241,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { reader.incrementState(); + case 8: + retVal = reader.readMessage("retVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDhtTxFinishResponse.class); @@ -213,6 +261,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 9; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java new file mode 100644 index 0000000..0c8ae69 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxOnePhaseCommitAckRequest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.nio.ByteBuffer; +import java.util.Collection; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * One Phase Commit Near transaction ack request. + */ +public class GridDhtTxOnePhaseCommitAckRequest extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Lock or transaction versions. */ + @GridToStringInclude + @GridDirectCollection(GridCacheVersion.class) + protected Collection<GridCacheVersion> vers; + + /** + * Default constructor. + */ + public GridDhtTxOnePhaseCommitAckRequest() { + // No-op. + } + + /** + * + * @param vers Near Tx xid Versions. + */ + public GridDhtTxOnePhaseCommitAckRequest(Collection<GridCacheVersion> vers) { + this.vers = vers; + } + + /** + * @return Version. + */ + public Collection<GridCacheVersion> versions() { + return vers; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxOnePhaseCommitAckRequest.class, this, super.toString()); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeCollection("vers", vers, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + vers = reader.readCollection("vers", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtTxOnePhaseCommitAckRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -27; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index ec73bff..1dbda69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1245,7 +1245,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.onePhaseCommit(), tx.subjectId(), tx.taskNameHash(), - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + retVal); int idx = 0; @@ -1356,7 +1357,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.onePhaseCommit(), tx.subjectId(), tx.taskNameHash(), - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + retVal); for (IgniteTxEntry entry : nearMapping.entries()) { if (CU.writes().apply(entry)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index 1cdc96f..a8f2087 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -52,6 +52,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** */ private static final long serialVersionUID = 0L; + /** */ + public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01; + /** Max order. */ private UUID nearNodeId; @@ -100,6 +103,9 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Preload keys. */ private BitSet preloadKeys; + /** */ + private byte flags; + /** * Empty constructor required for {@link Externalizable}. */ @@ -118,6 +124,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param txNodes Transaction nodes mapping. * @param nearXidVer Near transaction ID. * @param last {@code True} if this is last prepare request for node. + * @param retVal Need return value flag. * @param addDepInfo Deployment info flag. */ public GridDhtTxPrepareRequest( @@ -134,7 +141,8 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean onePhaseCommit, UUID subjId, int taskNameHash, - boolean addDepInfo) { + boolean addDepInfo, + boolean retVal) { super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo); assert futId != null; @@ -149,12 +157,31 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { this.subjId = subjId; this.taskNameHash = taskNameHash; + needReturnValue(retVal); + invalidateNearEntries = new BitSet(dhtWrites == null ? 0 : dhtWrites.size()); nearNodeId = tx.nearNodeId(); } /** + * @return Flag indicating whether transaction needs return value. + */ + public boolean needReturnValue() { + return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0; + } + + /** + * @param retVal Need return value. + */ + public void needReturnValue(boolean retVal) { + if (retVal) + flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK); + else + flags &= ~NEED_RETURN_VALUE_FLAG_MASK; + } + + /** * @return {@code True} if this is last prepare request for node. */ public boolean last() { @@ -348,78 +375,84 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { switch (writer.state()) { case 23: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 24: - if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 25: - if (!writer.writeBoolean("last", last)) + if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries)) return false; writer.incrementState(); case 26: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); case 27: - if (!writer.writeUuid("nearNodeId", nearNodeId)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 28: - if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 29: - if (!writer.writeMessage("nearXidVer", nearXidVer)) + if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 30: - if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); case 31: - if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 32: - if (!writer.writeBitSet("preloadKeys", preloadKeys)) + if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 33: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); case 34: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 35: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 36: if (!writer.writeMessage("topVer", topVer)) return false; @@ -442,7 +475,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { switch (reader.state()) { case 23: - futId = reader.readIgniteUuid("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -450,7 +483,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 24: - invalidateNearEntries = reader.readBitSet("invalidateNearEntries"); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -458,7 +491,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 25: - last = reader.readBoolean("last"); + invalidateNearEntries = reader.readBitSet("invalidateNearEntries"); if (!reader.isLastRead()) return false; @@ -466,7 +499,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 26: - miniId = reader.readIgniteUuid("miniId"); + last = reader.readBoolean("last"); if (!reader.isLastRead()) return false; @@ -474,7 +507,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 27: - nearNodeId = reader.readUuid("nearNodeId"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -482,7 +515,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 28: - nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); + nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) return false; @@ -490,7 +523,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 29: - nearXidVer = reader.readMessage("nearXidVer"); + nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -498,7 +531,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 30: - ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); + nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) return false; @@ -506,7 +539,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 31: - ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); + ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -514,7 +547,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 32: - preloadKeys = reader.readBitSet("preloadKeys"); + ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -522,7 +555,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 33: - subjId = reader.readUuid("subjId"); + preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) return false; @@ -530,7 +563,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 34: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -538,6 +571,14 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); case 35: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 36: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -557,6 +598,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 36; + return 37; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index dc27eb1..6ad20c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -189,9 +189,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { commitVer, sys, plc, - concurrency, - isolation, - invalidate, + concurrency, + isolation, + invalidate, timeout, txSize, subjId, http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1e45fa7..30a3d57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -29,9 +29,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import javax.cache.processor.EntryProcessorResult; @@ -60,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; +import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -82,7 +80,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -102,11 +99,9 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteOutClosure; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE; @@ -144,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos; /** Pending */ - private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>(); + private GridDeferredAckMessageSender deferredUpdateMessageSender; /** */ private GridNearAtomicCache<K, V> near; @@ -240,6 +235,53 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public void start() throws IgniteCheckedException { super.start(); + deferredUpdateMessageSender = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) { + @Override public int getTimeout() { + return DEFERRED_UPDATE_RESPONSE_TIMEOUT; + } + + @Override public int getBufferSize() { + return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE; + } + + @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) { + GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), + vers, ctx.deploymentEnabled()); + + try { + ctx.kernalContext().gateway().readLock(); + + try { + ctx.io().send(nodeId, msg, ctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() + + ", node=" + nodeId + ']'); + } + } + finally { + ctx.kernalContext().gateway().readUnlock(); + } + } + catch (IllegalStateException ignored) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send deferred DHT update response, node is stopping [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); + } + } + catch (ClusterTopologyCheckedException ignored) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send deferred DHT update response, node left [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send deferred DHT update response to remote node [" + + "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e); + } + } + }; + CacheMetricsImpl m = new CacheMetricsImpl(ctx); if (ctx.dht().near() != null) @@ -405,8 +447,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public void stop() { - for (DeferredResponseBuffer buf : pendingResponses.values()) - buf.finish(); + deferredUpdateMessageSender.stop(); } /** @@ -3208,28 +3249,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param ver Version to ack. */ private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) { - while (true) { - DeferredResponseBuffer buf = pendingResponses.get(nodeId); - - if (buf == null) { - buf = new DeferredResponseBuffer(nodeId); - - DeferredResponseBuffer old = pendingResponses.putIfAbsent(nodeId, buf); - - if (old == null) { - // We have successfully added buffer to map. - ctx.time().addTimeoutObject(buf); - } - else - buf = old; - } - - if (!buf.addResponse(ver)) - // Some thread is sending filled up buffer, we can remove it. - pendingResponses.remove(nodeId, buf); - else - break; - } + deferredUpdateMessageSender.sendDeferredAckMessage(nodeId, ver); } /** @@ -3452,149 +3472,4 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return Collections.emptyList(); } } - - /** - * Deferred response buffer. - */ - private class DeferredResponseBuffer extends ReentrantReadWriteLock implements GridTimeoutObject { - /** */ - private static final long serialVersionUID = 0L; - - /** Filled atomic flag. */ - private AtomicBoolean guard = new AtomicBoolean(false); - - /** Response versions. */ - private ConcurrentLinkedDeque8<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>(); - - /** Node ID. */ - private final UUID nodeId; - - /** Timeout ID. */ - private final IgniteUuid timeoutId; - - /** End time. */ - private final long endTime; - - /** - * @param nodeId Node ID to send message to. - */ - private DeferredResponseBuffer(UUID nodeId) { - this.nodeId = nodeId; - - timeoutId = IgniteUuid.fromUuid(nodeId); - - endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT; - } - - /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return timeoutId; - } - - /** {@inheritDoc} */ - @Override public long endTime() { - return endTime; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - if (guard.compareAndSet(false, true)) { - ctx.closures().runLocalSafe(new Runnable() { - @Override public void run() { - writeLock().lock(); - - try { - finish(); - } - finally { - writeLock().unlock(); - } - } - }); - } - } - - /** - * Adds deferred response to buffer. - * - * @param ver Version to send. - * @return {@code True} if response was handled, {@code false} if this buffer is filled and cannot be used. - */ - public boolean addResponse(GridCacheVersion ver) { - readLock().lock(); - - boolean snd = false; - - try { - if (guard.get()) - return false; - - respVers.add(ver); - - if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true)) - snd = true; - } - finally { - readLock().unlock(); - } - - if (snd) { - // Wait all threads in read lock to finish. - writeLock().lock(); - - try { - finish(); - - ctx.time().removeTimeoutObject(this); - } - finally { - writeLock().unlock(); - } - } - - return true; - } - - /** - * Sends deferred notification message and removes this buffer from pending responses map. - */ - private void finish() { - GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), - respVers, ctx.deploymentEnabled()); - - try { - ctx.kernalContext().gateway().readLock(); - - try { - ctx.io().send(nodeId, msg, ctx.ioPolicy()); - - if (msgLog.isDebugEnabled()) { - msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() + - ", node=" + nodeId + ']'); - } - } - finally { - ctx.kernalContext().gateway().readUnlock(); - } - } - catch (IllegalStateException ignored) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Failed to send deferred DHT update response, node is stopping [" + - "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); - } - } - catch (ClusterTopologyCheckedException ignored) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Failed to send deferred DHT update response, node left [" + - "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); - } - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send deferred DHT update response to remote node [" + - "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e); - } - - pendingResponses.remove(nodeId, this); - } - } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index d251528..4cbfb27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -526,7 +526,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim ) { GridCacheContext cacheCtx = entry.context(); - List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + List<ClusterNode> nodes = cacheCtx.isLocal() ? + cacheCtx.affinity().nodes(entry.key(), topVer) : + cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer); txMapping.addMapping(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index e17a76c..91cfbda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -27,7 +27,6 @@ import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; @@ -599,9 +598,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa GridCacheEntryEx cached0 = entry.cached(); if (cached0.isDht()) - nodes = cacheCtx.affinity().nodes(cached0.partition(), topVer); + nodes = cacheCtx.topology().nodes(cached0.partition(), topVer); else - nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + nodes = cacheCtx.isLocal() ? + cacheCtx.affinity().nodes(entry.key(), topVer) : + cacheCtx.topology().nodes(cacheCtx.affinity().partition(entry.key()), topVer); txMapping.addMapping(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 34b8281..5c09398 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -193,7 +193,9 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA GridCacheContext cacheCtx = txEntry.context(); - List<ClusterNode> nodes = cacheCtx.affinity().nodes(txEntry.key(), topVer); + List<ClusterNode> nodes = cacheCtx.isLocal() ? + cacheCtx.affinity().nodes(txEntry.key(), topVer) : + cacheCtx.topology().nodes(cacheCtx.affinity().partition(txEntry.key()), topVer); ClusterNode primary = F.first(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index bb5d482..46604c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -34,6 +34,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheFuture; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWrapper; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; @@ -76,6 +78,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu public static final IgniteProductVersion PRIMARY_SYNC_TXS_SINCE = IgniteProductVersion.fromString("1.6.0"); /** */ + public static final IgniteProductVersion ACK_DHT_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.6.8"); + + /** */ private static final long serialVersionUID = 0L; /** Logger reference. */ @@ -251,6 +256,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu assert f.node().id().equals(nodeId); + if (res.returnValue() != null) + tx.implicitSingleResult(res.returnValue()); + f.onDhtFinishResponse(res); } } @@ -432,6 +440,50 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu catch (IgniteCheckedException e) { onDone(e); } + finally { + if (commit && + tx.onePhaseCommit() && + !tx.writeMap().isEmpty()) // Readonly operations require no ack. + ackBackup(); + } + } + + /** + * + */ + private void ackBackup() { + if (mappings.empty()) + return; + + if (!tx.needReturnValue() || !tx.implicit()) + return; // GridCacheReturn was not saved at backup. + + GridDistributedTxMapping mapping = mappings.singleMapping(); + + if (mapping != null) { + UUID nodeId = mapping.node().id(); + + Collection<UUID> backups = tx.transactionNodes().get(nodeId); + + if (!F.isEmpty(backups)) { + assert backups.size() == 1 : backups; + + UUID backupId = F.first(backups); + + ClusterNode backup = cctx.discovery().node(backupId); + + // Nothing to do if backup has left the grid. + if (backup == null) { + // No-op. + } + else if (backup.isLocal()) + cctx.tm().removeTxReturn(tx.xidVersion()); + else { + if (ACK_DHT_ONE_PHASE_SINCE.compareToIgnoreTimestamp(backup.version()) <= 0) + cctx.tm().sendDeferredAckResponse(backupId, tx.xidVersion()); + } + } + } } /** @@ -475,23 +527,48 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu readyNearMappingFromBackup(mapping); if (committed) { - if (tx.syncMode() == FULL_SYNC) { - GridCacheVersion nearXidVer = tx.nearXidVersion(); + try { + if (tx.needReturnValue() && tx.implicit()) { + GridCacheReturnCompletableWrapper wrapper = + cctx.tm().getCommittedTxReturn(tx.xidVersion()); - assert nearXidVer != null : tx; + assert wrapper != null : tx.xidVersion(); - IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer); + GridCacheReturn retVal = wrapper.fut().get(); - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { - mini.onDone(tx); - } - }); + assert retVal != null; + + tx.implicitSingleResult(retVal); + } - return; + if (tx.syncMode() == FULL_SYNC) { + GridCacheVersion nearXidVer = tx.nearXidVersion(); + + assert nearXidVer != null : tx; + + IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer); + + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + mini.onDone(tx); + } + }); + + return; + } + + mini.onDone(tx); } + catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near finish fut, failed to finish [" + + "txId=" + tx.nearXidVersion() + + ", node=" + backup.id() + + ", err=" + e + ']'); + } - mini.onDone(tx); + mini.onDone(e); + } } else { ClusterTopologyCheckedException cause = @@ -504,7 +581,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } } else { - GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId()); + GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId(), false); // Preserve old behavior, otherwise response is not sent. if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0) @@ -765,9 +842,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** * @param miniId Mini future ID. + * @param waitRemoteTxs Wait for remote txs. * @return Finish request. */ - private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) { + private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId, boolean waitRemoteTxs) { GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest( cctx.localNodeId(), futureId(), @@ -791,7 +869,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu 0, null, 0, - tx.activeCachesDeploymentEnabled()); + tx.activeCachesDeploymentEnabled(), + !waitRemoteTxs && (tx.needReturnValue() && tx.implicit()), + waitRemoteTxs); finishReq.checkCommitted(true); @@ -872,9 +952,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu add(mini); - GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId()); - - req.waitRemoteTransactions(true); + GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId(), true); for (UUID backupId : backups) { ClusterNode backup = cctx.discovery().node(backupId); http://git-wip-us.apache.org/repos/asf/ignite/blob/9b72d18d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index eb2989e..18c3011 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; @@ -151,6 +152,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @GridToStringExclude protected GridCacheSharedContext<?, ?> cctx; + /** Need return value. */ + protected boolean needRetVal; + /** * End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) * assigned to this transaction at the end of write phase. @@ -695,6 +699,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** + * @return Flag indicating whether transaction needs return value. + */ + public boolean needReturnValue() { + return needRetVal; + } + + /** + * @param needRetVal Need return value flag. + */ + public void needReturnValue(boolean needRetVal) { + this.needRetVal = needRetVal; + } + + /** * Gets remaining allowed transaction time. * * @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out. @@ -1285,7 +1303,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (intercept || !F.isEmpty(e.entryProcessors())) e.cached().unswap(false); - IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false); + IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false, null); GridCacheContext cacheCtx = e.context(); @@ -1443,13 +1461,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** * @param txEntry Entry to process. * @param metrics {@code True} if metrics should be updated. + * @param ret Optional return value to initialize. * @return Tuple containing transformation results. * @throws IgniteCheckedException If failed to get previous value for transform. * @throws GridCacheEntryRemovedException If entry was concurrently deleted. */ protected IgniteBiTuple<GridCacheOperation, CacheObject> applyTransformClosures( IgniteTxEntry txEntry, - boolean metrics) throws GridCacheEntryRemovedException, IgniteCheckedException { + boolean metrics, + @Nullable GridCacheReturn ret) throws GridCacheEntryRemovedException, IgniteCheckedException { GridCacheContext cacheCtx = txEntry.context(); assert cacheCtx != null; @@ -1457,8 +1477,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (isSystemInvalidate()) return F.t(cacheCtx.writeThrough() ? RELOAD : DELETE, null); - if (F.isEmpty(txEntry.entryProcessors())) + if (F.isEmpty(txEntry.entryProcessors())) { + if (ret != null) + ret.value(cacheCtx, txEntry.value(), txEntry.keepBinary()); + return F.t(txEntry.op(), txEntry.value()); + } else { T2<GridCacheOperation, CacheObject> calcVal = txEntry.entryProcessorCalculatedValue(); @@ -1508,17 +1532,27 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( txEntry.key(), key, cacheVal, val, ver, keepBinary, txEntry.cached()); + Object procRes = null; + Exception err = null; + try { EntryProcessor<Object, Object, Object> processor = t.get1(); - processor.process(invokeEntry, t.get2()); + procRes = processor.process(invokeEntry, t.get2()); val = invokeEntry.getValue(); key = invokeEntry.key(); } - catch (Exception ignore) { - // No-op. + catch (Exception e) { + err = e; + } + + if (ret != null) { + if (err != null || procRes != null) + ret.addEntryProcessResult(txEntry.context(), txEntry.key(), null, procRes, err, keepBinary); + else + ret.invokeResult(true); } modified |= invokeEntry.modified();
