Internal cache API cleanup.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/decb0c7a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/decb0c7a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/decb0c7a Branch: refs/heads/ignite-1192 Commit: decb0c7aa62f9354b25ee0a09ca19b424a688e8b Parents: 82f016f Author: sboikov <[email protected]> Authored: Thu Mar 16 18:25:36 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Mar 16 18:25:36 2017 +0300 ---------------------------------------------------------------------- .../ClientAbstractMultiNodeSelfTest.java | 13 +- .../ignite/internal/IgniteTransactionsEx.java | 8 +- .../processors/cache/GridCacheAdapter.java | 98 +- .../processors/cache/GridCacheProxyImpl.java | 6 +- .../cache/GridCacheSharedContext.java | 11 +- .../processors/cache/GridCacheUtils.java | 6 +- .../processors/cache/IgniteInternalCache.java | 5 +- .../distributed/GridCacheCommittedTxInfo.java | 117 - .../GridDistributedCacheAdapter.java | 2 +- .../GridDistributedTxRemoteAdapter.java | 59 +- .../dht/GridDhtTransactionalCacheAdapter.java | 57 +- .../cache/distributed/dht/GridDhtTxLocal.java | 126 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 28 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 65 +- .../dht/colocated/GridDhtColocatedCache.java | 8 +- .../colocated/GridDhtColocatedLockFuture.java | 7 +- .../distributed/near/GridNearLockFuture.java | 4 +- .../distributed/near/GridNearLockRequest.java | 200 +- .../near/GridNearTransactionalCache.java | 6 +- .../near/GridNearTxFinishFuture.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 2732 ++++++++++++++++- .../near/GridNearTxPrepareFutureAdapter.java | 5 +- .../near/GridNearTxPrepareRequest.java | 2 +- .../distributed/near/GridNearTxRemote.java | 4 +- .../store/GridCacheStoreManagerAdapter.java | 142 +- .../cache/transactions/IgniteInternalTx.java | 80 +- .../transactions/IgniteTransactionsImpl.java | 12 +- .../cache/transactions/IgniteTxAdapter.java | 165 +- .../cache/transactions/IgniteTxEntry.java | 4 +- .../cache/transactions/IgniteTxHandler.java | 67 +- .../IgniteTxImplicitSingleStateImpl.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 2801 ++---------------- .../cache/transactions/IgniteTxLocalEx.java | 145 +- .../cache/transactions/IgniteTxManager.java | 293 +- .../cache/transactions/IgniteTxRemoteEx.java | 11 + .../IgniteTxRemoteStateAdapter.java | 2 +- .../cache/transactions/IgniteTxState.java | 2 +- .../cache/transactions/IgniteTxStateImpl.java | 4 +- .../transactions/TransactionProxyImpl.java | 13 +- .../datastructures/DataStructuresProcessor.java | 32 +- .../datastructures/GridCacheAtomicLongImpl.java | 18 +- .../GridCacheAtomicReferenceImpl.java | 6 +- .../GridCacheAtomicSequenceImpl.java | 4 +- .../GridCacheAtomicStampedImpl.java | 6 +- .../GridCacheCountDownLatchImpl.java | 6 +- .../datastructures/GridCacheLockImpl.java | 11 +- .../datastructures/GridCacheSemaphoreImpl.java | 18 +- .../GridTransactionalCacheQueueImpl.java | 10 +- .../processors/igfs/IgfsDataManager.java | 55 +- .../processors/igfs/IgfsMetaManager.java | 73 +- .../service/GridServiceProcessor.java | 6 +- .../internal/TestRecordingCommunicationSpi.java | 29 + .../processors/cache/GridCacheTestStore.java | 2 - .../cache/IgniteTxConfigCacheSelfTest.java | 4 +- .../IgniteCacheSystemTransactionsSelfTest.java | 7 +- .../IgniteTxCachePrimarySyncTest.java | 5 + ...xOriginatingNodeFailureAbstractSelfTest.java | 6 +- ...cOriginatingNodeFailureAbstractSelfTest.java | 4 +- ...ePrimaryNodeFailureRecoveryAbstractTest.java | 9 +- .../dht/IgniteCacheTxRecoveryRollbackTest.java | 501 ++++ .../GridCachePartitionedTxSalvageSelfTest.java | 7 +- .../TxOptimisticDeadlockDetectionTest.java | 30 +- ...lockMessageSystemPoolStarvationSelfTest.java | 6 +- .../IgniteCacheRestartTestSuite2.java | 3 +- .../IgniteCacheTxRecoverySelfTestSuite.java | 3 + .../HibernateReadWriteAccessStrategy.java | 12 +- .../processors/cache/jta/CacheJtaManager.java | 3 +- .../processors/cache/jta/CacheJtaResource.java | 8 +- 68 files changed, 4148 insertions(+), 4054 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java index 7fb2385..2fba49a 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable; @@ -480,11 +481,11 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract @SuppressWarnings("unchecked") private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { checkSyncFlags((GridIoMessage)msg); - super.sendMessage(node, msg, ackClosure); + super.sendMessage(node, msg, ackC); } /** @@ -512,13 +513,13 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract IgniteInternalTx t = tm.tx(v); if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x1")))) - assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode()); + assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode()); else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x2")))) - assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode()); + assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode()); else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x3")))) - assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode()); + assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode()); else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x4")))) - assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode()); + assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java index 9772dcc..4133ddc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -35,7 +35,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions { * @param txSize Number of entries participating in transaction (may be approximate). * @return New transaction. */ - public IgniteInternalTx txStartEx(GridCacheContext ctx, + public GridNearTxLocal txStartEx(GridCacheContext ctx, TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -47,5 +47,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions { * @param isolation Isolation. * @return New transaction. */ - public IgniteInternalTx txStartEx(GridCacheContext ctx, TransactionConcurrency concurrency, TransactionIsolation isolation); + public GridNearTxLocal txStartEx(GridCacheContext ctx, + TransactionConcurrency concurrency, + TransactionIsolation isolation); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 71be718..3bfd1f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -86,18 +86,16 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; -import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl; import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater; -import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; @@ -1876,7 +1874,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap()); - IgniteTxLocalAdapter tx = null; + GridNearTxLocal tx = null; if (checkTx) { try { @@ -2132,7 +2130,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } else { return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) { - @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<Map<K1, V1>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.getAllAsync(ctx, readyTopVer, keys, @@ -2187,7 +2185,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V protected V getAndPut0(final K key, final V val, @Nullable final CacheEntryPredicate filter) throws IgniteCheckedException { return syncOp(new SyncOp<V>(true) { - @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException { return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value(); } @@ -2237,7 +2235,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable final CacheEntryPredicate filter) { return asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.putAsync(ctx, readyTopVer, key, val, true, filter) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -2293,7 +2291,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V protected boolean put0(final K key, final V val, final CacheEntryPredicate filter) throws IgniteCheckedException { Boolean res = syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException { return tx.putAsync(ctx, null, key, val, false, filter).get().success(); } @@ -2316,7 +2314,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); syncOp(new SyncInOp(drMap.size() == 1) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { tx.putAllDrAsync(ctx, drMap).get(); } @@ -2335,7 +2333,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return asyncOp(new AsyncOp(drMap.keySet()) { - @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.putAllDrAsync(ctx, drMap); } @@ -2380,7 +2378,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { - @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) + @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx) throws IgniteCheckedException { assert topVer == null || tx.implicit(); @@ -2418,7 +2416,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { - @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) + @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { @@ -2448,7 +2446,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() { - @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); @@ -2491,7 +2489,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) { - @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, + @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { @Override public EntryProcessor apply(K k) { @@ -2532,7 +2530,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(map.keySet()); IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) { - @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.invokeAsync(ctx, readyTopVer, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, @@ -2568,7 +2566,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(map.keySet()); return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) { - @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) + @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) throws IgniteCheckedException { IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); @@ -2616,7 +2614,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val, @Nullable final CacheEntryPredicate filter) { return asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.putAsync(ctx, readyTopVer, key, @@ -2721,7 +2719,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ protected void putAll0(final Map<? extends K, ? extends V> m) throws IgniteCheckedException { syncOp(new SyncInOp(m.size() == 1) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { tx.putAllAsync(ctx, null, m, false).get(); } @@ -2748,7 +2746,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ protected IgniteInternalFuture<?> putAllAsync0(final Map<? extends K, ? extends V> m) { return asyncOp(new AsyncOp(m.keySet()) { - @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.putAllAsync(ctx, readyTopVer, m, @@ -2789,7 +2787,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean keepBinary = ctx.keepBinary(); return syncOp(new SyncOp<V>(true) { - @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException { K key0 = keepBinary ? (K) ctx.toCacheKeyObject(key) : key; V ret = tx.removeAllAsync(ctx, @@ -2839,7 +2837,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ protected IgniteInternalFuture<V> getAndRemoveAsync0(final K key) { return asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { // TODO should we invoke interceptor here? return tx.removeAllAsync(ctx, readyTopVer, @@ -2897,7 +2895,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ protected void removeAll0(final Collection<? extends K> keys) throws IgniteCheckedException { syncOp(new SyncInOp(keys.size() == 1) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { tx.removeAllAsync(ctx, null, keys, @@ -2938,7 +2936,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ protected IgniteInternalFuture<Object> removeAllAsync0(final Collection<? extends K> keys) { return asyncOp(new AsyncOp(keys) { - @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.removeAllAsync(ctx, readyTopVer, keys, @@ -2990,7 +2988,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ protected boolean remove0(final K key, final CacheEntryPredicate filter) throws IgniteCheckedException { Boolean res = syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException { return tx.removeAllAsync(ctx, null, Collections.singletonList(key), @@ -3046,7 +3044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable final CacheEntryPredicate filter) { return asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.removeAllAsync(ctx, readyTopVer, Collections.singletonList(key), @@ -3071,8 +3069,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); syncOp(new SyncInOp(false) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - tx.removeAllDrAsync(ctx, (Map)drMap).get(); + @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { + tx.removeAllDrAsync(ctx, drMap).get(); } @Override public String toString() { @@ -3090,8 +3088,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return asyncOp(new AsyncOp(drMap.keySet()) { - @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { - return tx.removeAllDrAsync(ctx, (Map)drMap); + @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { + return tx.removeAllDrAsync(ctx, drMap); } @Override public String toString() { @@ -3160,10 +3158,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public Transaction tx() { - IgniteTxAdapter tx = ctx.tm().threadLocalTx(ctx); - - return tx == null ? null : new TransactionProxyImpl<>(tx, ctx.shared(), false); + @Nullable @Override public GridNearTxLocal tx() { + return ctx.tm().threadLocalTx(ctx); } /** {@inheritDoc} */ @@ -3291,7 +3287,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) { + @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) { IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions(); return txs.txStartEx(ctx, concurrency, isolation); @@ -4142,7 +4138,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Transaction commit future. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final IgniteInternalTx tx) { + IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) { FutureHolder holder = lastFut.get(); holder.lock(); @@ -4154,7 +4150,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut, new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() { @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) { - return tx.commitAsync(); + return tx.commitNearTxLocalAsync(); } }); @@ -4163,7 +4159,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return f; } - IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync(); + IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync(); saveFuture(holder, f); @@ -4208,7 +4204,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V awaitLastFut(); - IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); if (tx == null || tx.implicit()) { TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config()); @@ -4304,7 +4300,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (log.isDebugEnabled()) log.debug("Performing async op: " + op); - IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -4348,7 +4344,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ @SuppressWarnings("unchecked") protected <T> IgniteInternalFuture<T> asyncOp( - IgniteTxLocalAdapter tx, + GridNearTxLocal tx, final AsyncOp<T> op, final CacheOperationContext opCtx ) { @@ -4364,7 +4360,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { IgniteInternalFuture fut = holder.future(); - final IgniteTxLocalAdapter tx0 = tx; + final GridNearTxLocal tx0 = tx; if (fut != null && !fut.isDone()) { IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut, @@ -4383,7 +4379,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throw e; } catch (IgniteCheckedException e1) { - tx0.rollbackAsync(); + tx0.rollbackNearTxLocalAsync(); throw e1; } @@ -4409,7 +4405,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V throw e; } catch (IgniteCheckedException e1) { - tx0.rollbackAsync(); + tx0.rollbackNearTxLocalAsync(); throw e1; } @@ -4925,7 +4921,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V private int retries; /** */ - private IgniteTxLocalAdapter tx; + private GridNearTxLocal tx; /** */ private CacheOperationContext opCtx; @@ -5173,7 +5169,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Operation return value. * @throws IgniteCheckedException If failed. */ - @Nullable public abstract T op(IgniteTxLocalAdapter tx) throws IgniteCheckedException; + @Nullable public abstract T op(GridNearTxLocal tx) throws IgniteCheckedException; } /** @@ -5188,7 +5184,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Nullable @Override public final Object op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Nullable @Override public final Object op(GridNearTxLocal tx) throws IgniteCheckedException { inOp(tx); return null; @@ -5198,7 +5194,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param tx Transaction. * @throws IgniteCheckedException If failed. */ - public abstract void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException; + public abstract void inOp(GridNearTxLocal tx) throws IgniteCheckedException; } /** @@ -5234,14 +5230,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param readyTopVer Ready topology version. * @return Operation future. */ - public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer); + public abstract IgniteInternalFuture<T> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer); /** * @param tx Transaction. * @param opCtx Operation context. * @return Operation future. */ - public IgniteInternalFuture<T> op(final IgniteTxLocalAdapter tx, CacheOperationContext opCtx) { + public IgniteInternalFuture<T> op(final GridNearTxLocal tx, CacheOperationContext opCtx) { AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot(); if (txTopVer != null) @@ -5267,7 +5263,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ private IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut, final AffinityTopologyVersion topVer, - final IgniteTxLocalAdapter tx, + final GridNearTxLocal tx, final CacheOperationContext opCtx) { final GridFutureAdapter fut0 = new GridFutureAdapter(); @@ -5304,7 +5300,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param opCtx Operation context. * @return Future. */ - private IgniteInternalFuture<T> runOp(IgniteTxLocalAdapter tx, + private IgniteInternalFuture<T> runOp(GridNearTxLocal tx, AffinityTopologyVersion topVer, CacheOperationContext opCtx) { ctx.operationContextPerCall(opCtx); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 00898ec..787a767 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -40,8 +40,8 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -939,7 +939,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) { + @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) { CacheOperationContext prev = gate.enter(opCtx); try { @@ -977,7 +977,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ - @Override public Transaction tx() { + @Override public GridNearTxLocal tx() { CacheOperationContext prev = gate.enter(opCtx); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 0f79100..39a3baa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -731,7 +732,7 @@ public class GridCacheSharedContext<K, V> { * @param tx Transaction to close. * @throws IgniteCheckedException If failed. */ - public void endTx(IgniteInternalTx tx) throws IgniteCheckedException { + public void endTx(GridNearTxLocal tx) throws IgniteCheckedException { tx.txState().awaitLastFut(this); tx.close(); @@ -742,13 +743,13 @@ public class GridCacheSharedContext<K, V> { * @return Commit future. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx) { + public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(GridNearTxLocal tx) { GridCacheContext ctx = tx.txState().singleCacheContext(this); if (ctx == null) { tx.txState().awaitLastFut(this); - return tx.commitAsync(); + return tx.commitNearTxLocalAsync(); } else return ctx.cache().commitTxAsync(tx); @@ -759,10 +760,10 @@ public class GridCacheSharedContext<K, V> { * @throws IgniteCheckedException If failed. * @return Rollback future. */ - public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException { + public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException { tx.txState().awaitLastFut(this); - return tx.rollbackAsync(); + return tx.rollbackNearTxLocalAsync(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 3e68b70..7131612 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -97,7 +98,6 @@ import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.LOCAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -891,7 +891,7 @@ public class GridCacheUtils { * @param isolation Isolation. * @return New transaction. */ - public static IgniteInternalTx txStartInternal(GridCacheContext ctx, IgniteInternalCache prj, + public static GridNearTxLocal txStartInternal(GridCacheContext ctx, IgniteInternalCache prj, TransactionConcurrency concurrency, TransactionIsolation isolation) { assert ctx != null; assert prj != null; @@ -1257,7 +1257,7 @@ public class GridCacheUtils { public static <K, V> void inTx(IgniteInternalCache<K, V> cache, TransactionConcurrency concurrency, TransactionIsolation isolation, IgniteInClosureX<IgniteInternalCache<K ,V>> clo) throws IgniteCheckedException { - try (IgniteInternalTx tx = cache.txStartEx(concurrency, isolation);) { + try (GridNearTxLocal tx = cache.txStartEx(concurrency, isolation);) { clo.applyx(cache); tx.commit(); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 0ac98fb..5471335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -43,6 +43,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -952,7 +953,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { * @param isolation Isolation. * @return New transaction. */ - public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation); + public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation); /** * Starts transaction with specified isolation, concurrency, timeout, invalidation flag, @@ -976,7 +977,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { * @return Transaction started by this thread or {@code null} if this thread * does not have a transaction. */ - @Nullable public Transaction tx(); + @Nullable public GridNearTxLocal tx(); /** * Evicts entry associated with given key from cache. Note, that entry will be evicted http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java deleted file mode 100644 index 875ada0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import java.util.Collections; -import java.util.UUID; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Committed transaction information. Contains recovery writes that will be used to set commit values - * in case if originating node crashes. - */ -@Deprecated -public class GridCacheCommittedTxInfo implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Originating transaction ID. */ - private GridCacheVersion originatingTxId; - - /** Originating node ID. */ - private UUID originatingNodeId; - - /** Recovery writes, i.e. values that have never been sent to remote nodes. */ - @GridToStringInclude - private Collection<IgniteTxEntry> recoveryWrites; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridCacheCommittedTxInfo() { - // No-op. - } - - /** - * @param tx Committed cache transaction. - */ - public GridCacheCommittedTxInfo(IgniteInternalTx tx) { - assert !tx.local() || !tx.replicated(); - - originatingTxId = tx.nearXidVersion(); - originatingNodeId = tx.eventNodeId(); - } - - /** - * @return Originating transaction ID (the transaction ID for replicated cache and near transaction ID - * for partitioned cache). - */ - public GridCacheVersion originatingTxId() { - return originatingTxId; - } - - /** - * @return Originating node ID (the local transaction node ID for replicated cache and near node ID - * for partitioned cache). - */ - public UUID originatingNodeId() { - return originatingNodeId; - } - - /** - * @return Collection of recovery writes. - */ - public Collection<IgniteTxEntry> recoveryWrites() { - return recoveryWrites == null ? Collections.<IgniteTxEntry>emptyList() : recoveryWrites; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - originatingTxId.writeExternal(out); - - U.writeUuid(out, originatingNodeId); - - U.writeCollection(out, recoveryWrites); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - originatingTxId = new GridCacheVersion(); - - originatingTxId.readExternal(in); - - originatingNodeId = U.readUuid(in); - - recoveryWrites = U.readCollection(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheCommittedTxInfo.class, this, "recoveryWrites", recoveryWrites); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 7e4deff..00bc6d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -111,7 +111,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout) { - IgniteTxLocalEx tx = ctx.tm().userTxx(); + IgniteTxLocalEx tx = ctx.tm().userTx(); // Return value flag is true because we choose to bring values for explicit locks. return lockAllAsync(ctx.cacheKeysView(keys), http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 68c0e57..b31a7be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -83,7 +83,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN; /** * Transaction created by system implicitly on remote nodes. */ -public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter +public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter implements IgniteTxRemoteEx { /** */ private static final long serialVersionUID = 0L; @@ -180,11 +180,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public Collection<UUID> masterNodeIds() { - return Collections.singleton(nodeId); - } - - /** {@inheritDoc} */ @Override public UUID originatingNodeId() { return nodeId; } @@ -347,12 +342,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() { - assert false; - return null; - } - - /** {@inheritDoc} */ @Override public Set<IgniteTxKey> readSet() { return txState.readSet(); } @@ -378,11 +367,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** - * Prepare phase. - * - * @throws IgniteCheckedException If prepare failed. + * @throws IgniteCheckedException If failed. */ - @Override public void prepare() throws IgniteCheckedException { + public final void prepareRemoteTx() throws IgniteCheckedException { // If another thread is doing prepare or rollback. if (!state(PREPARING)) { // In optimistic mode prepare may be called multiple times. @@ -729,7 +716,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public void commit() throws IgniteCheckedException { + @Override public final void commitRemoteTx() throws IgniteCheckedException { if (optimistic()) state(PREPARED); @@ -748,7 +735,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (!isSystemInvalidate()) throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']'); - rollback(); + rollbackRemoteTx(); } commitIfLocked(); @@ -766,7 +753,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { try { - commit(); + commitRemoteTx(); return new GridFinishedFuture<IgniteInternalTx>(this); } @@ -776,8 +763,36 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @SuppressWarnings({"CatchGenericClass"}) - @Override public void rollback() { + @Override public final IgniteInternalFuture<?> salvageTx() { + try { + systemInvalidate(true); + + prepareRemoteTx(); + + if (state() == PREPARING) { + if (log.isDebugEnabled()) + log.debug("Ignoring transaction in PREPARING state as it is currently handled " + + "by another thread: " + this); + + return null; + } + + doneRemote(xidVersion(), + Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList()); + + commitRemoteTx(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to invalidate transaction: " + xidVersion(), e); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public final void rollbackRemoteTx() { try { // Note that we don't evict near entries here - // they will be deleted by their corresponding transactions. @@ -796,7 +811,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { - rollback(); + rollbackRemoteTx(); return new GridFinishedFuture<IgniteInternalTx>(this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index dea4072..1e09eda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -178,7 +178,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @throws GridDistributedLockCancelledException If lock has been cancelled. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable GridDhtTxRemote startRemoteTx(UUID nodeId, + @Nullable private GridDhtTxRemote startRemoteTx(UUID nodeId, GridDhtLockRequest req, GridDhtLockResponse res) throws IgniteCheckedException, GridDistributedLockCancelledException { @@ -307,7 +307,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (tx.state() == COMMITTING) tx.forceCommit(); else - tx.rollback(); + tx.rollbackRemoteTx(); } return null; @@ -362,7 +362,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (log.isDebugEnabled()) log.debug("Rolling back remote DHT transaction because it is empty [req=" + req + ", res=" + res + ']'); - tx.rollback(); + tx.rollbackRemoteTx(); tx = null; } @@ -374,7 +374,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param req Request. */ - protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) { + private void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) { if (txLockMsgLog.isDebugEnabled()) { txLockMsgLog.debug("Received dht lock request [txId=" + req.nearXidVersion() + ", dhtTxId=" + req.version() + @@ -452,7 +452,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param req Request. */ - protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) { + private void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) { assert nodeId != null; assert req != null; assert !nodeId.equals(locNodeId); @@ -561,10 +561,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (fail) { if (dhtTx != null) - dhtTx.rollback(); + dhtTx.rollbackRemoteTx(); if (nearTx != null) // Even though this should never happen, we leave this check for consistency. - nearTx.rollback(); + nearTx.rollbackRemoteTx(); List<KeyCacheObject> keys = req.keys(); @@ -602,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach * @param nodeId Node ID. * @param req Request. */ - protected void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) { + private void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) { clearLocks(nodeId, req); if (isNearEnabled(cacheCfg)) @@ -961,8 +961,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.futureId(), req.miniId(), req.threadId(), - req.implicitTx(), - req.implicitSingleTx(), + /*implicitTx*/false, + /*implicitSingleTx*/false, ctx.systemTx(), false, ctx.ioPolicy(), @@ -989,7 +989,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach U.warn(log, msg); if (tx != null) - tx.rollback(); + tx.rollbackDhtLocal(); return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); } @@ -1038,31 +1038,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach t.xidVersion(), e); - if (resp.error() == null && t.onePhaseCommit()) { - assert t.implicit(); - - return t.commitAsync().chain( - new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() { - @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) { - try { - // Check for error. - f.get(); - } - catch (IgniteCheckedException e1) { - resp.error(e1); - } - - sendLockReply(nearNode, t, req, resp); - - return resp; - } - }); - } - else { - sendLockReply(nearNode, t, req, resp); + assert !t.implicit() : t; + assert !t.onePhaseCommit() : t; - return new GridFinishedFuture<>(resp); - } + sendLockReply(nearNode, t, req, resp); + + return new GridFinishedFuture<>(resp); } } ); @@ -1105,7 +1086,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (tx != null) { try { - tx.rollback(); + tx.rollbackDhtLocal(); } catch (IgniteCheckedException ex) { U.error(log, "Failed to rollback the transaction: " + tx, ex); @@ -1309,7 +1290,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach */ private void sendLockReply( ClusterNode nearNode, - @Nullable IgniteInternalTx tx, + @Nullable GridDhtTxLocal tx, GridNearLockRequest req, GridNearLockResponse res ) { @@ -1347,7 +1328,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach ", res=" + res + ']', e); if (tx != null) - tx.rollbackAsync(); + tx.rollbackDhtLocalAsync(); // Convert to closure exception as this method is only called form closures. throw new GridClosureException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index bff69bc..b1c7e5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -292,82 +292,22 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<?> prepareAsync() { - if (optimistic()) { - assert isSystemInvalidate(); - - return prepareAsync( - null, - null, - Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), - 0, - nearMiniId, - null, - true); - } - - long timeout = remainingTime(); + @Override public IgniteInternalFuture<?> salvageTx() { + systemInvalidate(true); - // For pessimistic mode we don't distribute prepare request. - GridDhtTxPrepareFuture fut = prepFut; + state(PREPARED); - if (fut == null) { - // Future must be created before any exception can be thrown. - if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( - cctx, - this, - timeout, - nearMiniId, - Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), - true, - needReturnValue()))) { - if (timeout == -1) - prepFut.onError(timeoutException()); + if (state() == PREPARING) { + if (log.isDebugEnabled()) + log.debug("Ignoring transaction in PREPARING state as it is currently handled " + + "by another thread: " + this); - return prepFut; - } - } - else - // Prepare was called explicitly. - return fut; - - if (!state(PREPARING)) { - if (setRollbackOnly()) { - if (timeout == -1) - fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + - this)); - else - fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + - ", tx=" + this + ']')); - } - else - fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + - state() + ", tx=" + this + ']')); - - return fut; + return null; } - try { - userPrepare(); - - if (!state(PREPARED)) { - setRollbackOnly(); - - fut.onError(new IgniteCheckedException("Invalid transaction state for commit [state=" + state() + - ", tx=" + this + ']')); - - return fut; - } + setRollbackOnly(); - fut.complete(); - - return fut; - } - catch (IgniteCheckedException e) { - fut.onError(e); - - return fut; - } + return rollbackDhtLocalAsync(); } /** @@ -382,7 +322,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @param last {@code True} if this is last prepare request. * @return Future that will be completed when locks are acquired. */ - public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( + public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( @Nullable Collection<IgniteTxEntry> reads, @Nullable Collection<IgniteTxEntry> writes, Map<IgniteTxKey, GridCacheVersion> verMap, @@ -478,7 +418,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e)); try { - rollback(); + rollbackDhtLocal(); } catch (IgniteTxOptimisticCheckedException e1) { if (log.isDebugEnabled()) @@ -523,7 +463,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (prepFut != null) prepFut.get(); // Check for errors. - boolean finished = finish(commit); + boolean finished = localFinish(commit); if (!finished) err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit + @@ -544,16 +484,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa fut.finish(commit); } - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { + /** + * @return Commit future. + */ + public IgniteInternalFuture<IgniteInternalTx> commitDhtLocalAsync() { if (log.isDebugEnabled()) log.debug("Committing dht local tx: " + this); - // In optimistic mode prepare was called explicitly. - if (pessimistic()) - prepareAsync(); - final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true); cctx.mvcc().addFuture(fut, fut.futureId()); @@ -581,15 +518,29 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() { + return commitDhtLocalAsync(); + } + + /** {@inheritDoc} */ @Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) { assert optimistic(); PREP_FUT_UPD.compareAndSet(this, fut, null); } - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { + /** + * @throws IgniteCheckedException If failed. + */ + public void rollbackDhtLocal() throws IgniteCheckedException { + rollbackDhtLocalAsync().get(); + } + + /** + * @return Rollback future. + */ + public IgniteInternalFuture<IgniteInternalTx> rollbackDhtLocalAsync() { final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false); cctx.mvcc().addFuture(fut, fut.futureId()); @@ -612,8 +563,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() { + return rollbackDhtLocalAsync(); + } + + /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) - @Override public boolean finish(boolean commit) throws IgniteCheckedException { + @Override public boolean localFinish(boolean commit) throws IgniteCheckedException { assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate() || onePhaseCommit() || state() == PREPARED : "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit + @@ -621,7 +577,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa assert nearMiniId != 0; - return super.finish(commit); + return super.localFinish(commit); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 67e1993..0329386 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -29,7 +29,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -161,7 +160,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @param node Node. */ - public void addLockTransactionNode(ClusterNode node) { + void addLockTransactionNode(ClusterNode node) { assert node != null; assert !node.isLocal(); @@ -185,7 +184,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * * @return Has near cache flag. */ - public boolean nearOnOriginatingNode() { + boolean nearOnOriginatingNode() { return nearOnOriginatingNode; } @@ -206,7 +205,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @return Nodes where transactions were started on lock step. */ - @Nullable public Set<ClusterNode> lockTransactionNodes() { + @Nullable Set<ClusterNode> lockTransactionNodes() { return lockTxNodes; } @@ -349,14 +348,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @param mappings Mappings to add. */ - void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { + private void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { addMapping(mappings, dhtMap); } /** * @param mappings Mappings to add. */ - void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { + private void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { addMapping(mappings, nearMap); } @@ -654,7 +653,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { needRetVal, createTtl, accessTtl, - null, skipStore, keepBinary); } @@ -673,7 +671,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param needRetVal Return value flag. * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. - * @param filter Entry write filter. * @param skipStore Skip store flag. * @return Future for lock acquisition. */ @@ -685,7 +682,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { final boolean needRetVal, final long createTtl, final long accessTtl, - @Nullable final CacheEntryPredicate[] filter, boolean skipStore, boolean keepBinary) { if (log.isDebugEnabled()) @@ -729,7 +725,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /*retval*/false, /*read*/read, accessTtl, - filter == null ? CU.empty0() : filter, + CU.empty0(), /*computeInvoke*/false); return ret; @@ -740,7 +736,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"}) - @Override public boolean finish(boolean commit) throws IgniteCheckedException { + @Override public boolean localFinish(boolean commit) throws IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]"); @@ -858,16 +854,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** {@inheritDoc} */ - @Override public void rollback() throws IgniteCheckedException { - try { - rollbackAsync().get(); - } - finally { - cctx.tm().resetContext(); - } - } - - /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 56884ff..93ea30d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -718,18 +718,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo = new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { - try { - if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1)) - sendPrepareResponse(res); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + tx.nearNodeId() + - ", res=" + res, - ", tx=" + tx, - e); - } + if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1)) + sendPrepareResponse(res); } }; @@ -761,18 +751,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } else { - try { - if (REPLIED_UPD.compareAndSet(this, 0, 1)) - sendPrepareResponse(res); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + tx.nearNodeId() + - ", res=" + res, - ", tx=" + tx, - e); - } + if (REPLIED_UPD.compareAndSet(this, 0, 1)) + sendPrepareResponse(res); } return true; @@ -784,14 +764,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter try { sendPrepareResponse(res); } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + tx.nearNodeId() + - ", res=" + res, - ", tx=" + tx, - e); - } finally { // Will call super.onDone(). onComplete(res); @@ -819,9 +791,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param res Response. - * @throws IgniteCheckedException If failed to send response. */ - private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException { + private void sendPrepareResponse(GridNearTxPrepareResponse res) { if (!tx.nearNodeId().equals(cctx.localNodeId())) { Throwable err = this.err; @@ -837,13 +808,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return; } - cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); + try { + cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() + + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res + ']'); + } + } + catch (ClusterTopologyCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Failed to send prepare response, node left [txId=" + tx.nearXidVersion() + "," + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + tx.nearNodeId() + + ", res=" + res + ']'); + } + } + catch (IgniteCheckedException e) { + U.error(msgLog, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," + ", dhtTxId=" + tx.xidVersion() + ", node=" + tx.nearNodeId() + - ", res=" + res + ']'); + ", res=" + res, + ", tx=" + tx + ']', + e); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index e1e0ec2..03bbfe0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -207,13 +207,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKey(key); - IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); final CacheOperationContext opCtx = ctx.operationContextPerCall(); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx, readyTopVer, Collections.singleton(ctx.toCacheKeyObject(key)), @@ -289,13 +289,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKeys(keys); - IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); final CacheOperationContext opCtx = ctx.operationContextPerCall(); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { return tx.getAllAsync(ctx, readyTopVer, ctx.cacheKeysView(keys), http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 0ce380d..79c15fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -917,6 +917,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture first = false; } + assert !implicitTx() && !implicitSingleTx() : tx; + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -925,8 +927,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture futId, lockVer, inTx(), - implicitTx(), - implicitSingleTx(), read, retval, isolation(), @@ -982,9 +982,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } } - if (inTx() && req != null) - req.hasTransforms(tx.hasTransforms()); - if (!distributedKeys.isEmpty()) { mapping.distributedKeys(distributedKeys); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index ffc84d8..1948df0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1045,6 +1045,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean first = false; } + assert !implicitTx() && !implicitSingleTx() : tx; + req = new GridNearLockRequest( cctx.cacheId(), topVer, @@ -1053,8 +1055,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean futId, lockVer, inTx(), - implicitTx(), - implicitSingleTx(), read, retval, isolation(),
