http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index f5687a0..307c348 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -17,21 +17,8 @@ package org.apache.ignite.internal.processors.cache.transactions; -import java.util.Collection; -import java.util.Map; -import javax.cache.Cache; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.lang.GridInClosure3; import org.jetbrains.annotations.Nullable; /** @@ -59,141 +46,11 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { public void userRollback() throws IgniteCheckedException; /** - * @param cacheCtx Cache context. - * @param keys Keys to get. - * @param deserializeBinary Deserialize binary flag. - * @param skipVals Skip values flag. - * @param keepCacheObjects Keep cache objects - * @param skipStore Skip store flag. - * @return Future for this get. - */ - public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Collection<KeyCacheObject> keys, - boolean deserializeBinary, - boolean skipVals, - boolean keepCacheObjects, - boolean skipStore, - boolean needVer); - - /** - * @param cacheCtx Cache context. - * @param map Map to put. - * @param retval Flag indicating whether a value should be returned. - * @return Future for put operation. - */ - public <K, V> IgniteInternalFuture<GridCacheReturn> putAllAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Map<? extends K, ? extends V> map, - boolean retval); - - /** - * @param cacheCtx Cache context. - * @param key Key. - * @param val Value. - * @param retval Return value flag. - * @param filter Filter. - * @return Future for put operation. - */ - public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - K key, - V val, - boolean retval, - CacheEntryPredicate filter); - - /** - * @param cacheCtx Cache context. - * @param key Key. - * @param entryProcessor Entry processor. - * @param invokeArgs Optional arguments for entry processor. - * @return Operation future. - */ - public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - K key, - EntryProcessor<K, V, Object> entryProcessor, - Object... invokeArgs); - - /** - * @param cacheCtx Cache context. - * @param map Entry processors map. - * @param invokeArgs Optional arguments for entry processor. - * @return Operation future. - */ - public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Map<? extends K, ? extends EntryProcessor<K, V, Object>> map, - Object... invokeArgs); - - /** - * @param cacheCtx Cache context. - * @param keys Keys to remove. - * @param retval Flag indicating whether a value should be returned. - * @param filter Filter. - * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. - * @return Future for asynchronous remove. - */ - public <K, V> IgniteInternalFuture<GridCacheReturn> removeAllAsync( - GridCacheContext cacheCtx, - @Nullable AffinityTopologyVersion entryTopVer, - Collection<? extends K> keys, - boolean retval, - CacheEntryPredicate filter, - boolean singleRmv); - - /** - * @param cacheCtx Cache context. - * @param drMap DR map to put. - * @return Future for DR put operation. - */ - public IgniteInternalFuture<?> putAllDrAsync( - GridCacheContext cacheCtx, - Map<KeyCacheObject, GridCacheDrInfo> drMap); - - /** - * @param cacheCtx Cache context. - * @param drMap DR map. - * @return Future for asynchronous remove. - */ - public IgniteInternalFuture<?> removeAllDrAsync( - GridCacheContext cacheCtx, - Map<KeyCacheObject, GridCacheVersion> drMap); - - /** * Finishes transaction (either commit or rollback). * * @param commit {@code True} if commit, {@code false} if rollback. * @return {@code True} if state has been changed. * @throws IgniteCheckedException If finish failed. */ - public boolean finish(boolean commit) throws IgniteCheckedException; - - /** - * @param cacheCtx Cache context. - * @param readThrough Read through flag. - * @param async if {@code True}, then loading will happen in a separate thread. - * @param keys Keys. - * @param skipVals Skip values flag. - * @param needVer If {@code true} version is required for loaded values. - * @param c Closure to be applied for loaded values. - * @param expiryPlc Expiry policy. - * @return Future with {@code True} value if loading took place. - */ - public IgniteInternalFuture<Void> loadMissing( - GridCacheContext cacheCtx, - AffinityTopologyVersion topVer, - boolean readThrough, - boolean async, - Collection<KeyCacheObject> keys, - boolean skipVals, - boolean needVer, - boolean keepBinary, - final ExpiryPolicy expiryPlc, - GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c); + public boolean localFinish(boolean commit) throws IgniteCheckedException; }
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index df3bad2..d1334ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -56,7 +56,6 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; -import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; @@ -82,7 +81,6 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; @@ -127,7 +125,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Slow tx warn timeout (initialized to 0). */ private static final int SLOW_TX_WARN_TIMEOUT = Integer.getInteger(IGNITE_SLOW_TX_WARN_TIMEOUT, 0); - /** Tx salvage timeout (default 3s). */ + /** Tx salvage timeout. */ private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100); /** One phase commit deferred ack request timeout. */ @@ -138,9 +136,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private static final int DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE = Integer.getInteger(IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE, 256); - /** Version in which deadlock detection introduced. */ - public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19"); - /** Deadlock detection maximum iterations. */ static int DEADLOCK_MAX_ITERS = IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000); @@ -184,7 +179,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { PER_SEGMENT_Q); /** Pending one phase commit ack requests sender. */ - private GridDeferredAckMessageSender deferredAckMessageSender; + private GridDeferredAckMessageSender deferredAckMsgSnd; /** Transaction finish synchronizer. */ private GridCacheTxFinishSync txFinishSync; @@ -206,54 +201,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private TxDeadlockDetection txDeadlockDetection; /** {@inheritDoc} */ - @Override protected void onKernalStart0(boolean reconnect) { - if (reconnect) - return; - - cctx.gridEvents().addLocalEventListener( - new GridLocalEventListener() { - @Override public void onEvent(Event evt) { - assert evt instanceof DiscoveryEvent; - assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; - - DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - - UUID nodeId = discoEvt.eventNode().id(); - - cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId)); - - if (txFinishSync != null) - txFinishSync.onNodeLeft(nodeId); - - for (TxDeadlockFuture fut : deadlockDetectFuts.values()) - fut.onNodeLeft(nodeId); - - for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) { - Object obj = entry.getValue(); - - if (obj instanceof GridCacheReturnCompletableWrapper && - nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId())) - removeTxReturn(entry.getKey()); - } - } - }, - EVT_NODE_FAILED, EVT_NODE_LEFT); - - this.txDeadlockDetection = new TxDeadlockDetection(cctx); - - cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener()); - - for (IgniteInternalTx tx : idMap.values()) { - if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) { - if (log.isDebugEnabled()) - log.debug("Remaining transaction from left node: " + tx); - - salvageTx(tx, true, USER_FINISH); - } - } - } - - /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { cctx.gridIO().removeMessageListener(TOPIC_TX); } @@ -264,7 +211,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { txHnd = new IgniteTxHandler(cctx); - deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) { + deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) { @Override public int getTimeout() { return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; } @@ -293,6 +240,40 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } } }; + + cctx.gridEvents().addLocalEventListener( + new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + assert evt instanceof DiscoveryEvent; + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + DiscoveryEvent discoEvt = (DiscoveryEvent)evt; + + UUID nodeId = discoEvt.eventNode().id(); + + // Wait some time in case there are some unprocessed messages from failed node. + cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId)); + + if (txFinishSync != null) + txFinishSync.onNodeLeft(nodeId); + + for (TxDeadlockFuture fut : deadlockDetectFuts.values()) + fut.onNodeLeft(nodeId); + + for (Map.Entry<GridCacheVersion, Object> entry : completedVersHashMap.entrySet()) { + Object obj = entry.getValue(); + + if (obj instanceof GridCacheReturnCompletableWrapper && + nodeId.equals(((GridCacheReturnCompletableWrapper)obj).nodeId())) + removeTxReturn(entry.getKey()); + } + } + }, + EVT_NODE_FAILED, EVT_NODE_LEFT); + + this.txDeadlockDetection = new TxDeadlockDetection(cctx); + + cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener()); } /** {@inheritDoc} */ @@ -320,85 +301,35 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Invalidates transaction. * * @param tx Transaction. - * @return {@code True} if transaction was salvaged by this call. */ - public boolean salvageTx(IgniteInternalTx tx) { - return salvageTx(tx, false, USER_FINISH); + public void salvageTx(IgniteInternalTx tx) { + salvageTx(tx, USER_FINISH); } /** * Invalidates transaction. * * @param tx Transaction. - * @param warn {@code True} if warning should be logged. * @param status Finalization status. - * @return {@code True} if transaction was salvaged by this call. */ - private boolean salvageTx(IgniteInternalTx tx, boolean warn, IgniteInternalTx.FinalizationStatus status) { + private void salvageTx(IgniteInternalTx tx, IgniteInternalTx.FinalizationStatus status) { assert tx != null; TransactionState state = tx.state(); - if (state == ACTIVE || state == PREPARING || state == PREPARED) { - try { - if (!tx.markFinalizing(status)) { - if (log.isDebugEnabled()) - log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx); - - return false; - } - - tx.systemInvalidate(true); - - tx.prepare(); - - if (tx.state() == PREPARING) { - if (log.isDebugEnabled()) - log.debug("Ignoring transaction in PREPARING state as it is currently handled " + - "by another thread: " + tx); - - return false; - } - - if (tx instanceof IgniteTxRemoteEx) { - IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx; - - rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), - Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList()); - } - - tx.commit(); - - if (warn) { - // This print out cannot print any peer-deployed entity either - // directly or indirectly. - U.warn(log, "Invalidated transaction because originating node either " + - "crashed or left grid: " + CU.txString(tx)); - } - } - catch (IgniteCheckedException ignore) { + if (state == ACTIVE || state == PREPARING || state == PREPARED || state == MARKED_ROLLBACK) { + if (!tx.markFinalizing(status)) { if (log.isDebugEnabled()) - log.debug("Optimistic failure while invalidating transaction (will rollback): " + - tx.xidVersion()); + log.debug("Will not try to commit invalidate transaction (could not mark finalized): " + tx); - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); - } - } - } - else if (state == MARKED_ROLLBACK) { - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback transaction: " + tx.xidVersion(), e); + return; } - } - return true; + tx.salvageTx(); + + if (log.isDebugEnabled()) + log.debug("Invalidated transaction because originating node left grid: " + CU.txString(tx)); + } } /** @@ -442,7 +373,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return {@code True} if transaction has been committed or rolled back, * {@code false} otherwise. */ - public boolean isCompleted(IgniteInternalTx tx) { + private boolean isCompleted(IgniteInternalTx tx) { boolean completed = completedVersHashMap.containsKey(tx.xidVersion()); // Need check that for tx with timeout rollback message was not received before lock. @@ -461,7 +392,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param txSize Expected transaction size. * @return New transaction. */ - public IgniteTxLocalAdapter newTx( + public GridNearTxLocal newTx( boolean implicit, boolean implicitSingle, @Nullable GridCacheContext sysCacheCtx, @@ -672,13 +603,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @param cctx Cache context. * @return Transaction for current thread. */ - @SuppressWarnings({"unchecked"}) - public <T> T threadLocalTx(GridCacheContext cctx) { + public GridNearTxLocal threadLocalTx(GridCacheContext cctx) { IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId()); - return tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit() ? (T)tx : null; + if (tx != null && tx.local() && (!tx.dht() || tx.colocated()) && !tx.implicit()) { + assert tx instanceof GridNearTxLocal : tx; + + return (GridNearTxLocal)tx; + } + + return null; } /** @@ -747,48 +684,53 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Local transaction. */ @Nullable public IgniteInternalTx localTxx() { - IgniteInternalTx tx = txx(); + IgniteInternalTx tx = tx(); return tx != null && tx.local() ? tx : null; } /** - * @return Transaction for current thread. - */ - @SuppressWarnings({"unchecked"}) - public IgniteInternalTx txx() { - return tx(); - } - - /** * @return User transaction for current thread. */ - @Nullable public IgniteInternalTx userTx() { + @Nullable public GridNearTxLocal userTx() { IgniteInternalTx tx = txContext(); - if (tx != null && tx.user() && tx.state() == ACTIVE) - return tx; + if (activeUserTx(tx)) + return (GridNearTxLocal)tx; tx = tx(null, Thread.currentThread().getId()); - return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null; + if (activeUserTx(tx)) + return (GridNearTxLocal)tx; + + return null; } /** + * @param cctx Cache context. * @return User transaction for current thread. */ - @Nullable public IgniteInternalTx userTx(GridCacheContext cctx) { + @Nullable GridNearTxLocal userTx(GridCacheContext cctx) { IgniteInternalTx tx = tx(cctx, Thread.currentThread().getId()); - return tx != null && tx.user() && tx.state() == ACTIVE ? tx : null; + if (activeUserTx(tx)) + return (GridNearTxLocal)tx; + + return null; } /** - * @return User transaction. + * @param tx Transaction. + * @return {@code True} if given transaction is explicitly started user transaction. */ - @SuppressWarnings({"unchecked"}) - @Nullable public <T extends IgniteTxLocalEx> T userTxx() { - return (T)userTx(); + private boolean activeUserTx(@Nullable IgniteInternalTx tx) { + if (tx != null && tx.user() && tx.state() == ACTIVE) { + assert tx instanceof GridNearTxLocal : tx; + + return true; + } + + return false; } /** @@ -1241,7 +1183,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { unlockMultiple(tx, tx.readEntries()); // 6. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 7. Remove obsolete entries from cache. removeObsolete(tx); @@ -1314,7 +1256,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { unlockMultiple(tx, tx.readEntries()); // 4. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 5. Remove obsolete entries. removeObsolete(tx); @@ -1364,7 +1306,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (txIdMap.remove(tx.xidVersion(), tx)) { // 1. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 2. Evict near entries. if (!tx.readMap().isEmpty()) { @@ -1400,7 +1342,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * * @param tx Tx to uncommit. */ - public void uncommitTx(IgniteInternalTx tx) { + void uncommitTx(IgniteInternalTx tx) { assert tx != null; if (log.isDebugEnabled()) @@ -1417,15 +1359,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { unlockMultiple(tx, tx.readEntries()); // 3. Notify evictions. - notifyEvitions(tx); + notifyEvictions(tx); // 4. Remove from per-thread storage. clearThreadMap(tx); // 5. Unregister explicit locks. - if (!tx.alternateVersions().isEmpty()) + if (!tx.alternateVersions().isEmpty()) { for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); + } // 6. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) @@ -1481,7 +1424,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * @param tx Transaction to notify evictions for. */ - private void notifyEvitions(IgniteInternalTx tx) { + private void notifyEvictions(IgniteInternalTx tx) { if (tx.internal()) return; @@ -1981,10 +1924,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return; } - if (tx instanceof GridDistributedTxRemoteAdapter) { + if (tx instanceof IgniteTxRemoteEx) { IgniteTxRemoteEx rmtTx = (IgniteTxRemoteEx)tx; - rmtTx.doneRemote(tx.xidVersion(), Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(), + rmtTx.doneRemote(tx.xidVersion(), + Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList()); } @@ -2058,43 +2003,27 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return; } - if (supportsDeadlockDetection(node)) { - TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys); + TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys); - try { - if (!cctx.localNodeId().equals(nodeId)) - req.prepareMarshal(cctx); - - cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL); - } - catch (IgniteCheckedException e) { - if (e instanceof ClusterTopologyCheckedException) { - if (log.isDebugEnabled()) - log.debug("Failed to finish deadlock detection, node left: " + nodeId); - } - else - U.warn(log, "Failed to finish deadlock detection: " + e, e); + try { + if (!cctx.localNodeId().equals(nodeId)) + req.prepareMarshal(cctx); - fut.onDone(); - } + cctx.gridIO().sendToGridTopic(node, TOPIC_TX, req, SYSTEM_POOL); } - else { - if (log.isDebugEnabled()) - log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node); + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException) { + if (log.isDebugEnabled()) + log.debug("Failed to finish deadlock detection, node left: " + nodeId); + } + else + U.warn(log, "Failed to finish deadlock detection: " + e, e); fut.onDone(); } } /** - * @param node Node. - * @return {@code True} if node supports deadlock detection protocol. - */ - private boolean supportsDeadlockDetection(ClusterNode node) { - return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0; - } - - /** * @param tx Tx. * @param txKeys Tx keys. * @return {@code True} if key is involved into tx. @@ -2265,7 +2194,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param ver Version to ack. */ public void sendDeferredAckResponse(UUID nodeId, GridCacheVersion ver) { - deferredAckMessageSender.sendDeferredAckMessage(nodeId, ver); + deferredAckMsgSnd.sendDeferredAckMessage(nodeId, ver); } /** @@ -2314,9 +2243,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { ", failedNodeId=" + evtNodeId + ']'); for (final IgniteInternalTx tx : txs()) { - if ((tx.near() && !tx.local()) || (tx.storeUsed() && tx.masterNodeIds().contains(evtNodeId))) { + if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) { // Invalidate transactions. - salvageTx(tx, false, RECOVERY_FINISH); + salvageTx(tx, RECOVERY_FINISH); } else { // Check prepare only if originating node ID failed. Otherwise parent node will finish this tx. http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java index 8ceca3f..87cc7cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** @@ -25,6 +26,16 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; */ public interface IgniteTxRemoteEx extends IgniteInternalTx { /** + * @throws IgniteCheckedException If failed. + */ + public void commitRemoteTx() throws IgniteCheckedException; + + /** + * + */ + public void rollbackRemoteTx(); + + /** * @param baseVer Base version. * @param committedVers Committed version. * @param rolledbackVers Rolled back version. http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java index 1c2ccbe..3c27bad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java @@ -88,7 +88,7 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState } /** {@inheritDoc} */ - @Override public boolean storeUsed(GridCacheSharedContext cctx) { + @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java index c121b1b..822e44e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -104,7 +104,7 @@ public interface IgniteTxState { * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with * store enabled. */ - public boolean storeUsed(GridCacheSharedContext cctx); + public boolean storeWriteThrough(GridCacheSharedContext cctx); /** * @param cctx Context. http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index 76751de..399eea3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -289,14 +289,14 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public boolean storeUsed(GridCacheSharedContext cctx) { + @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { if (!activeCacheIds.isEmpty()) { for (int i = 0; i < activeCacheIds.size(); i++) { int cacheId = (int)activeCacheIds.get(i); CacheStoreManager store = cctx.cacheContext(cacheId).store(); - if (store.configured()) + if (store.configured() && store.isWriteThrough()) return true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 6134b9f..8ffec00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -51,7 +52,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza /** Wrapped transaction. */ @GridToStringInclude - private IgniteInternalTx tx; + private GridNearTxLocal tx; /** Gateway. */ @GridToStringExclude @@ -75,7 +76,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza * @param cctx Shared context. * @param async Async flag. */ - public TransactionProxyImpl(IgniteInternalTx tx, GridCacheSharedContext<K, V> cctx, boolean async) { + public TransactionProxyImpl(GridNearTxLocal tx, GridCacheSharedContext<K, V> cctx, boolean async) { assert tx != null; assert cctx != null; @@ -87,7 +88,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza /** * @return Transaction. */ - public IgniteInternalTx tx() { + public GridNearTxLocal tx() { return tx; } @@ -316,7 +317,9 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) { IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() { @Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException { - return fut.get().proxy(); + fut.get(); + + return TransactionProxyImpl.this; } }); @@ -330,7 +333,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - tx = (IgniteInternalTx)in.readObject(); + tx = (GridNearTxLocal)in.readObject(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 96644a3..0420182 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -67,7 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheInternal; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; @@ -95,8 +95,8 @@ import static org.apache.ignite.internal.processors.datastructures.DataStructure import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.ATOMIC_STAMPED; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.COUNT_DOWN_LATCH; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.QUEUE; -import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.REENTRANT_LOCK; +import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SEMAPHORE; import static org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.DataStructureType.SET; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -342,7 +342,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicSequenceValue seqVal = cast(dsView.get(key), GridCacheAtomicSequenceValue.class); // Check that sequence hasn't been created in other thread yet. @@ -471,7 +471,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = cast(dsView.get(key), GridCacheAtomicLongValue.class); // Check that atomic long hasn't been created in other thread yet. @@ -551,7 +551,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { if (!create) return c.applyx(); - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { IgniteCheckedException err = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddAtomicProcessor(dsInfo)).get(); @@ -623,7 +623,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { retryTopologySafe(new IgniteOutClosureX<Void>() { @Override public Void applyx() throws IgniteCheckedException { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { T2<Boolean, IgniteCheckedException> res = utilityCache.invoke(DATA_STRUCTURES_KEY, new RemoveDataStructureProcessor(dsInfo)).get(); @@ -682,7 +682,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue val = cast(dsView.get(key), GridCacheAtomicReferenceValue.class); @@ -786,7 +786,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue val = cast(dsView.get(key), GridCacheAtomicStampedValue.class); @@ -1033,7 +1033,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return retryTopologySafe(new IgniteOutClosureX<T>() { @Override public T applyx() throws IgniteCheckedException { - try (IgniteInternalTx tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = utilityCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { T2<String, IgniteCheckedException> res = utilityCache.invoke(DATA_STRUCTURES_KEY, new AddCollectionProcessor(dsInfo)).get(); @@ -1133,7 +1133,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); // Check that count down hasn't been created in other thread yet. @@ -1198,7 +1198,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. GridCacheCountDownLatchValue val = cast(dsView.get(key), GridCacheCountDownLatchValue.class); @@ -1254,7 +1254,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class); // Check that semaphore hasn't been created in other thread yet. @@ -1319,7 +1319,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. GridCacheSemaphoreState val = cast(dsView.get(key), GridCacheSemaphoreState.class); @@ -1371,7 +1371,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class); // Check that reentrant lock hasn't been created in other thread yet. @@ -1438,7 +1438,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.gate().enter(); - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. GridCacheLockState val = cast(dsView.get(key), GridCacheLockState.class); @@ -1474,7 +1474,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { return CU.outTx( new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { // Check correctness type of removable object. R val = cast(dsView.get(key), cls); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index dfd2122..640b72d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -90,7 +90,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #incrementAndGet()}. */ private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -117,7 +117,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #getAndIncrement()}. */ private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -144,7 +144,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #decrementAndGet()}. */ private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -171,7 +171,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Callable for {@link #getAndDecrement()}. */ private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -430,7 +430,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalAddAndGet(final long l) { return retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -464,7 +464,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalGetAndAdd(final long l) { return retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -498,7 +498,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalGetAndSet(final long l) { return new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) @@ -534,7 +534,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) { return new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicLongValue val = atomicView.get(key); if (val == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 448dd8b..6911b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -213,7 +213,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef private Callable<Boolean> internalSet(final T val) { return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); if (ref == null) @@ -247,7 +247,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) { return retryTopologySafe(new Callable<T>() { @Override public T call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); if (ref == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 754d8f5..87aae8b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -35,7 +35,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; @@ -486,7 +486,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc private Callable<Long> internalUpdate(final long l, final boolean updated) { return retryTopologySafe(new Callable<Long>() { @Override public Long call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicSequenceValue seq = seqView.get(key); checkRemoved(); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index 6ac303c..14f80e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -29,7 +29,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -267,7 +267,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt private Callable<Boolean> internalSet(final T val, final S stamp) { return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); if (stmp == null) @@ -305,7 +305,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt final IgniteClosure<S, S> newStampClos) { return retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); if (stmp == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 723fb55..45c3677 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -32,7 +32,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -282,7 +282,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc internalLatch = CU.outTx( retryTopologySafe(new Callable<CountDownLatch>() { @Override public CountDownLatch call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue val = latchView.get(key); if (val == null) { @@ -407,7 +407,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public Integer call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheCountDownLatchValue latchVal = latchView.get(key); if (latchVal == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java index 1cf78fa..5f0cb44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -49,7 +49,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -520,8 +520,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { - + try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); if (val == null) @@ -614,7 +613,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); if (val == null) @@ -711,7 +710,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); if (val == null) @@ -1089,7 +1088,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable sync = CU.outTx( retryTopologySafe(new Callable<Sync>() { @Override public Sync call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); if (val == null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index a11c79d..a1c0515 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -40,7 +40,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgnitionEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -285,7 +285,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter return CU.outTx( retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, + try (GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ) ) { @@ -359,7 +359,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter retryTopologySafe(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try ( - IgniteInternalTx tx = CU.txStartInternal(ctx, + GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ) ) { @@ -454,7 +454,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter sync = CU.outTx( retryTopologySafe(new Callable<Sync>() { @Override public Sync call() throws Exception { - try (IgniteInternalTx tx = CU.txStartInternal(ctx, + try (GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheSemaphoreState val = semView.get(key); @@ -465,7 +465,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter return null; } - final int count = val.getCount(); + final int cnt = val.getCount(); Map<UUID, Integer> waiters = val.getWaiters(); @@ -473,7 +473,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter tx.commit(); - return new Sync(count, waiters, failoverSafe); + return new Sync(cnt, waiters, failoverSafe); } } }), @@ -676,7 +676,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter retryTopologySafe(new Callable<Integer>() { @Override public Integer call() throws Exception { try ( - IgniteInternalTx tx = CU.txStartInternal(ctx, + GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ) ) { GridCacheSemaphoreState val = semView.get(key); @@ -684,11 +684,11 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Exter if (val == null) throw new IgniteException("Failed to find semaphore with given name: " + name); - int count = val.getCount(); + int cnt = val.getCount(); tx.rollback(); - return count; + return cnt; } } }), http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index 7b80765..846eb69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -25,7 +25,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -57,7 +57,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> @Override public Boolean call() throws Exception { boolean retVal; - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, 1)).get(); if (idx != null) { @@ -97,7 +97,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> T retVal; while (true) { - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { Long idx = (Long)cache.invoke(queueKey, new PollProcessor(id)).get(); if (idx != null) { @@ -143,7 +143,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> @Override public Boolean call() throws Exception { boolean retVal; - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { Long idx = (Long)cache.invoke(queueKey, new AddProcessor(id, items.size())).get(); if (idx != null) { @@ -188,7 +188,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T> try { retryTopologySafe(new Callable<Object>() { @Override public Object call() throws Exception { - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { Long idx = (Long)cache.invoke(queueKey, new RemoveProcessor(id, rmvIdx)).get(); if (idx != null) { http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index 74fc175..acd0a1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -17,7 +17,31 @@ package org.apache.ignite.internal.processors.igfs; +import java.io.DataInput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -41,7 +65,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters; import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor; import org.apache.ignite.internal.processors.task.GridInternal; @@ -58,31 +82,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import javax.cache.processor.EntryProcessor; -import javax.cache.processor.MutableEntry; -import java.io.DataInput; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Deque; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS; @@ -654,7 +653,7 @@ public class IgfsDataManager extends IgfsManager { // Need to check if block is partially written. // If so, must update it in pessimistic transaction. if (block.length != fileInfo.blockSize()) { - try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key)); byte[] val = vals.get(colocatedKey); @@ -1062,7 +1061,7 @@ public class IgfsDataManager extends IgfsManager { IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null, colocatedKey.evictExclude(), colocatedKey.blockId()); - try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + try (GridNearTxLocal tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { // Lock keys. Map<IgfsBlockKey, byte[]> vals = dataCachePrj.getAll(F.asList(colocatedKey, key));
