This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 961fdd020a2 IGNITE-19873 GridNearTxLocal initial cleanup (#10813) 961fdd020a2 is described below commit 961fdd020a264c3bdf47c212a7ef545309760662 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Fri Jun 30 15:32:13 2023 +0300 IGNITE-19873 GridNearTxLocal initial cleanup (#10813) --- .../cache/distributed/near/GridNearTxLocal.java | 96 ++++++++-------------- .../cache/transactions/IgniteTxManager.java | 3 +- .../TxDataConsistencyOnCommitFailureTest.java | 2 +- 3 files changed, 34 insertions(+), 67 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index d498204161c..0543c19c4ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -117,11 +117,11 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOOP; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; @@ -262,7 +262,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param taskNameHash Task name hash code. * @param lb Label. * @param txDumpsThrottling Log throttling information. - * @param tracingEnabled {@code true} if the transaction should be traced. */ public GridNearTxLocal( GridCacheSharedContext ctx, @@ -279,8 +278,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @Nullable UUID subjId, int taskNameHash, @Nullable String lb, - IgniteTxManager.TxDumpsThrottling txDumpsThrottling, - boolean tracingEnabled + IgniteTxManager.TxDumpsThrottling txDumpsThrottling ) { super( ctx, @@ -641,11 +639,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou entryProcessor, invokeArgs, retval, - /*lockOnly*/false, filters, ret, opCtx != null && opCtx.skipStore(), - /*singleRmv*/false, keepBinary, opCtx != null && opCtx.recovery(), dataCenterId); @@ -827,16 +823,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou return transform ? EnlistOperation.TRANSFORM : EnlistOperation.UPSERT; } - @Override public boolean hasNextX() throws IgniteCheckedException { + @Override public boolean hasNextX() { return it.hasNext(); } - @Override public IgniteBiTuple<KeyCacheObject, Object> nextX() throws IgniteCheckedException { + @Override public IgniteBiTuple<KeyCacheObject, Object> nextX() { Map.Entry<KeyCacheObject, Object> next = it.next(); return new IgniteBiTuple<>(next.getKey(), next.getValue()); } - }, retval, filter, remainingTime(), true); + }, retval, filter, remainingTime()); } catch (IgniteCheckedException e) { return new GridFinishedFuture(e); @@ -893,18 +889,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou else dataCenterId = null; - // Cached entry may be passed only from entry wrapper. - final Map<?, ?> map0 = map; final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; if (log.isDebugEnabled()) - log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map0 + ", retval=" + retval + "]"); + log.debug("Called putAllAsync(...) [tx=" + this + ", map=" + map + ", retval=" + retval + "]"); - assert map0 != null || invokeMap0 != null; + assert map != null || invokeMap0 != null; final GridCacheReturn ret = new GridCacheReturn(localResult(), false); - if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) { + if (F.isEmpty(map) && F.isEmpty(invokeMap0)) { if (implicit()) try { commit(); @@ -917,7 +911,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } try { - Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); + Set<?> keySet = map != null ? map.keySet() : invokeMap0.keySet(); final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size()); @@ -928,11 +922,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou entryTopVer, keySet, opCtx != null ? opCtx.expiry() : null, - map0, + map, invokeMap0, invokeArgs, retval, - false, CU.filterArray(null), ret, enlisted, @@ -1035,11 +1028,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param entryProcessor Entry processor (for invoke operation). * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Flag indicating whether a value should be returned. - * @param lockOnly If {@code true}, then entry will be enlisted as noop. * @param filter User filters. * @param ret Return value. * @param skipStore Skip store flag. - * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. * @param recovery Recovery flag. * @param dataCenterId Optional data center Id. * @return Future for entry values loading. @@ -1053,11 +1044,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @Nullable EntryProcessor<K, V, Object> entryProcessor, @Nullable Object[] invokeArgs, final boolean retval, - boolean lockOnly, final CacheEntryPredicate[] filter, final GridCacheReturn ret, boolean skipStore, - final boolean singleRmv, boolean keepBinary, boolean recovery, Byte dataCenterId) { @@ -1072,7 +1061,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou addActiveCache(cacheCtx, recovery); final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); - final boolean needVal = singleRmv || retval || hasFilters; + final boolean needVal = retval || hasFilters; final boolean needReadVer = needVal && (serializable() && optimistic()); if (entryProcessor != null) @@ -1088,7 +1077,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou invokeArgs, expiryPlc, retval, - lockOnly, filter, /*drVer*/drVer, /*drTtl*/-1L, @@ -1096,12 +1084,11 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou ret, /*enlisted*/null, skipStore, - singleRmv, + false, hasFilters, needVal, needReadVer, - keepBinary, - recovery); + keepBinary); if (loadMissed) { AffinityTopologyVersion topVer = topologyVersionSnapshot(); @@ -1115,7 +1102,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou filter, ret, needReadVer, - singleRmv, + false, hasFilters, /*read through*/(entryProcessor != null || cacheCtx.config().isLoadPreviousValue()) && !skipStore, retval, @@ -1159,7 +1146,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param invokeMap Map with entry processors for invoke operation. * @param invokeArgs Optional arguments for EntryProcessor. * @param retval Flag indicating whether a value should be returned. - * @param lockOnly If {@code true}, then entry will be enlisted as noop. * @param filter User filters. * @param ret Return value. * @param enlisted Collection of keys enlisted into this transaction. @@ -1181,7 +1167,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap, @Nullable Object[] invokeArgs, final boolean retval, - boolean lockOnly, final CacheEntryPredicate[] filter, final GridCacheReturn ret, Collection<KeyCacheObject> enlisted, @@ -1282,7 +1267,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou invokeArgs, expiryPlc, retval, - lockOnly, filter, drVer, drTtl, @@ -1294,8 +1278,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou hasFilters, needVal, needReadVer, - keepBinary, - recovery); + keepBinary); if (loadMissed) { if (missedForLoad == null) @@ -1357,7 +1340,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param invokeArgs Optional arguments for EntryProcessor. * @param expiryPlc Explicitly specified expiry policy for entry. * @param retval Return value flag. - * @param lockOnly Lock only flag. * @param filter Filter. * @param drVer DR version. * @param drTtl DR ttl. @@ -1380,7 +1362,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou @Nullable final Object[] invokeArgs, @Nullable final ExpiryPolicy expiryPlc, final boolean retval, - final boolean lockOnly, final CacheEntryPredicate[] filter, final GridCacheVersion drVer, final long drTtl, @@ -1392,8 +1373,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou boolean hasFilters, final boolean needVal, boolean needReadVer, - boolean keepBinary, - boolean recovery + boolean keepBinary ) throws IgniteCheckedException { boolean loadMissed = false; @@ -1482,7 +1462,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou else old = entry.rawGet(); - final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + final GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { @@ -1827,7 +1807,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /*invoke map*/null, /*invoke arguments*/null, retval, - /*lock only*/false, filters, ret, enlisted, @@ -1981,7 +1960,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou try { MvccUtils.requestSnapshot(this); - beforeRemove(cacheCtx, retval, true); + beforeRemove(cacheCtx, retval); } catch (IgniteCheckedException e) { return new GridFinishedFuture(e); @@ -2033,14 +2012,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou return EnlistOperation.DELETE; } - @Override public boolean hasNextX() throws IgniteCheckedException { + @Override public boolean hasNextX() { return it.hasNext(); } - @Override public KeyCacheObject nextX() throws IgniteCheckedException { + @Override public KeyCacheObject nextX() { return it.next(); } - }, retval, filter, remainingTime(), true); + }, retval, filter, remainingTime()); } /** @@ -2117,15 +2096,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @param retval Return value flag. * @param filter Filter. * @param timeout Timeout. - * @param sequential Sequential locking flag. * @return Operation future. */ private IgniteInternalFuture<GridCacheReturn> updateAsync(GridCacheContext cacheCtx, UpdateSourceIterator<?> it, boolean retval, @Nullable CacheEntryPredicate filter, - long timeout, - boolean sequential) { + long timeout) { try { final CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); @@ -2135,7 +2112,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou but possibly we can safely optimize this. */ GridNearTxEnlistFuture fut = new GridNearTxEnlistFuture(cacheCtx, this, - timeout, it, 0, sequential, filter, retval, keepBinary); + timeout, it, 0, true, filter, retval, keepBinary); fut.init(); @@ -2241,7 +2218,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou final ReadRepairStrategy readRepairStrategy, final boolean needVer) { if (F.isEmpty(keys)) - return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); + return new GridFinishedFuture<>(Collections.emptyMap()); if (cacheCtx.mvccEnabled() && !isOperationAllowed(true)) return txTypeMismatchFinishFuture(); @@ -2466,11 +2443,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou null, null, false, - false, null, null, skipStore, - false, !deserializeBinary, recovery, null); @@ -3012,7 +2987,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou cacheCtx, topVer, readThrough, - /*async*/true, keys, /*skipVals*/singleRmv, needReadVer, @@ -3115,7 +3089,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * @param cacheCtx Cache context. * @param readThrough Read through flag. - * @param async if {@code True}, then loading will happen in a separate thread. * @param keys Keys. * @param skipVals Skip values flag. * @param needVer If {@code true} version is required for loaded values. @@ -3128,7 +3101,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou final GridCacheContext cacheCtx, AffinityTopologyVersion topVer, boolean readThrough, - boolean async, final Collection<KeyCacheObject> keys, final boolean skipVals, final boolean needVer, @@ -3533,22 +3505,21 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out. */ public void resume() throws IgniteCheckedException { - resume(true, Thread.currentThread().getId()); + resume(Thread.currentThread().getId()); } /** * Resumes transaction (possibly in another thread) if it was previously suspended. * - * @param checkTimeout Whether timeout should be checked. * @param threadId Thread id to restore. * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out. */ - private void resume(boolean checkTimeout, long threadId) throws IgniteCheckedException { + private void resume(long threadId) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Resume near local tx: " + this); synchronized (this) { - checkValid(checkTimeout); + checkValid(true); cctx.tm().resumeTx(this, threadId); } @@ -4267,7 +4238,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou this, timeout, 0, - Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), + Collections.emptyMap(), req.last(), needReturnValue() && implicit()); @@ -4774,7 +4745,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou cacheCtx, topVer, !skipStore, - false, missedMap.keySet(), skipVals, needReadVer, @@ -4927,17 +4897,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * @param cacheCtx Cache context. * @param retval Return value flag. - * @param mvccOp SQL operation flag. * @throws IgniteCheckedException If failed. */ - private void beforeRemove(GridCacheContext cacheCtx, boolean retval, boolean mvccOp) throws IgniteCheckedException { - assert !mvccOp || cacheCtx.mvccEnabled(); + private void beforeRemove(GridCacheContext cacheCtx, boolean retval) throws IgniteCheckedException { + assert cacheCtx.mvccEnabled(); checkUpdatesAllowed(cacheCtx); cacheCtx.checkSecurity(SecurityPermission.CACHE_REMOVE); - if (cacheCtx.mvccEnabled() && !isOperationAllowed(mvccOp)) + if (cacheCtx.mvccEnabled() && !isOperationAllowed(true)) throw new IgniteCheckedException(TX_TYPE_MISMATCH_ERR_MSG); if (retval) @@ -5093,9 +5062,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** * @param t Argument. * @return Result. - * @throws IgniteCheckedException If failed. */ - abstract T finish(T t) throws IgniteCheckedException; + abstract T finish(T t); } /** */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index dcd3092a593..ca331ad3e41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -787,8 +787,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { securitySubjectId(cctx), taskNameHash, lb, - txDumpsThrottling, - tracingEnabled + txDumpsThrottling ); if (tx.system()) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java index 39055d6bc75..76509672a37 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDataConsistencyOnCommitFailureTest.java @@ -224,7 +224,7 @@ public class TxDataConsistencyOnCommitFailureTest extends GridCommonAbstractTest boolean storeEnabled, Boolean mvccOp, int txSize, @Nullable UUID subjId, int taskNameHash, @Nullable String lb, IgniteTxManager.TxDumpsThrottling txDumpsThrottling) { super(ctx, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, storeEnabled, mvccOp, - txSize, subjId, taskNameHash, lb, txDumpsThrottling, false); + txSize, subjId, taskNameHash, lb, txDumpsThrottling); } /** {@inheritDoc} */