http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index a0f28c5..f9a6353 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -38,7 +38,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteReducer; @@ -160,8 +159,8 @@ public abstract class GridNearTxPrepareFutureAdapter extends * * @param txMapping Transaction mapping. */ - protected final void checkOnePhase(GridDhtTxMapping txMapping) { - if (tx.storeUsed()) + final void checkOnePhase(GridDhtTxMapping txMapping) { + if (tx.storeWriteThrough()) return; Map<UUID, Collection<UUID>> map = txMapping.transactionNodes();
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index ffeeb51..5b0807f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -103,7 +103,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { public GridNearTxPrepareRequest( IgniteUuid futId, AffinityTopologyVersion topVer, - IgniteInternalTx tx, + GridNearTxLocal tx, long timeout, Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 4f4be57..c961f6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -115,7 +115,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { ctx, nodeId, xidVer, - commitVer, + commitVer, sys, plc, concurrency, @@ -289,7 +289,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { * * @param key Evicted key. */ - public void addEvicted(IgniteTxKey key) { + void addEvicted(IgniteTxKey key) { evicted.add(key); } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index d448446..7a69a6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -27,10 +27,12 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.UUID; import javax.cache.Cache; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreSession; @@ -61,11 +63,17 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -904,7 +912,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private static class SessionData { /** */ @GridToStringExclude - private final IgniteInternalTx tx; + private final TxProxy tx; /** */ private String cacheName; @@ -914,7 +922,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private Map<Object, Object> props; /** */ - private Object attachment; + private Object attach; /** */ private final Set<CacheStoreManager> started = @@ -927,8 +935,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt * @param tx Current transaction. * @param cacheName Cache name. */ - private SessionData(@Nullable IgniteInternalTx tx, @Nullable String cacheName) { - this.tx = tx; + private SessionData(@Nullable final IgniteInternalTx tx, @Nullable String cacheName) { + this.tx = tx != null ? new TxProxy(tx) : null; this.cacheName = cacheName; } @@ -936,7 +944,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt * @return Transaction. */ @Nullable private Transaction transaction() { - return tx != null ? tx.proxy() : null; + return tx; } /** @@ -950,12 +958,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } /** - * @param attachment Attachment. + * @param attach Attachment. */ - private Object attach(Object attachment) { - Object prev = this.attachment; + private Object attach(Object attach) { + Object prev = this.attach; - this.attachment = attachment; + this.attach = attach; return prev; } @@ -964,7 +972,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt * @return Attachment. */ private Object attachment() { - return attachment; + return attach; } /** @@ -998,7 +1006,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** {@inheritDoc} */ @Override public String toString() { - return S.toString(SessionData.class, this, "tx", CU.txString(tx)); + return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null)); } } @@ -1298,4 +1306,116 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt } } } + + /** + * + */ + private static class TxProxy implements Transaction { + /** */ + private final IgniteInternalTx tx; + + /** + * @param tx Transaction. + */ + TxProxy(IgniteInternalTx tx) { + assert tx != null; + + this.tx = tx; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid xid() { + return tx.xid(); + } + + /** {@inheritDoc} */ + @Override public UUID nodeId() { + return tx.nodeId(); + } + + /** {@inheritDoc} */ + @Override public long threadId() { + return tx.threadId(); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return tx.startTime(); + } + + /** {@inheritDoc} */ + @Override public TransactionIsolation isolation() { + return tx.isolation(); + } + + /** {@inheritDoc} */ + @Override public TransactionConcurrency concurrency() { + return tx.concurrency(); + } + + /** {@inheritDoc} */ + @Override public boolean implicit() { + return tx.implicit(); + } + + /** {@inheritDoc} */ + @Override public boolean isInvalidate() { + return tx.isInvalidate(); + } + + /** {@inheritDoc} */ + @Override public TransactionState state() { + return tx.state(); + } + + /** {@inheritDoc} */ + @Override public long timeout() { + return tx.timeout(); + } + + /** {@inheritDoc} */ + @Override public long timeout(long timeout) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean setRollbackOnly() { + return tx.setRollbackOnly(); + } + + /** {@inheritDoc} */ + @Override public boolean isRollbackOnly() { + return tx.isRollbackOnly(); + } + + /** {@inheritDoc} */ + @Override public void commit() throws IgniteException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void rollback() throws IgniteException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public IgniteAsyncSupport withAsync() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index dd900fe..7598003 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -21,8 +21,6 @@ import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -35,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.lang.GridTuple; -import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -45,7 +42,7 @@ import org.jetbrains.annotations.Nullable; /** * Transaction managed by cache ({@code 'Ex'} stands for external). */ -public interface IgniteInternalTx extends AutoCloseable { +public interface IgniteInternalTx { /** * */ @@ -57,9 +54,6 @@ public interface IgniteInternalTx extends AutoCloseable { /** Transaction is being finalized by user. */ USER_FINISH, - /** Recovery request is received, user finish requests should be ignored. */ - RECOVERY_WAIT, - /** Transaction is being finalized by recovery procedure. */ RECOVERY_FINISH } @@ -183,29 +177,6 @@ public interface IgniteInternalTx extends AutoCloseable { public boolean isRollbackOnly(); /** - * Commits this transaction by initiating {@code two-phase-commit} process. - * - * @throws IgniteCheckedException If commit failed. - */ - @IgniteAsyncSupported - public void commit() throws IgniteCheckedException; - - /** - * Ends the transaction. Transaction will be rolled back if it has not been committed. - * - * @throws IgniteCheckedException If transaction could not be gracefully ended. - */ - @Override public void close() throws IgniteCheckedException; - - /** - * Rolls back this transaction. - * - * @throws IgniteCheckedException If rollback failed. - */ - @IgniteAsyncSupported - public void rollback() throws IgniteCheckedException; - - /** * Removes metadata by key. * * @param key Key of the metadata to remove. @@ -248,7 +219,7 @@ public interface IgniteInternalTx extends AutoCloseable { * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with * store enabled. */ - public boolean storeUsed(); + public boolean storeWriteThrough(); /** * Checks if this is system cache transaction. System transactions are isolated from user transactions @@ -306,11 +277,6 @@ public interface IgniteInternalTx extends AutoCloseable { public boolean empty(); /** - * @return {@code True} if preparing flag was set with this call. - */ - public boolean markPreparing(); - - /** * @param status Finalization status to set. * @return {@code True} if could mark was set. */ @@ -405,11 +371,6 @@ public interface IgniteInternalTx extends AutoCloseable { public boolean local(); /** - * @return {@code True} if transaction is replicated. - */ - public boolean replicated(); - - /** * @return Subject ID initiated this transaction. */ public UUID subjectId(); @@ -432,11 +393,6 @@ public interface IgniteInternalTx extends AutoCloseable { public boolean user(); /** - * @return Transaction write synchronization mode. - */ - public CacheWriteSynchronizationMode syncMode(); - - /** * @param key Key to check. * @return {@code True} if key is present. */ @@ -524,18 +480,9 @@ public interface IgniteInternalTx extends AutoCloseable { public void commitVersion(GridCacheVersion commitVer); /** - * Prepare state. - * - * @throws IgniteCheckedException If failed. - */ - public void prepare() throws IgniteCheckedException; - - /** - * Prepare stage. - * - * @return Future for prepare step. + * @return Future. */ - public IgniteInternalFuture<?> prepareAsync(); + @Nullable public IgniteInternalFuture<?> salvageTx(); /** * @param endVer End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>) @@ -669,7 +616,8 @@ public interface IgniteInternalTx extends AutoCloseable { * @param committed Committed transactions relative to base. * @param rolledback Rolled back transactions relative to base. */ - public void completedVersions(GridCacheVersion base, Collection<GridCacheVersion> committed, + public void completedVersions(GridCacheVersion base, + Collection<GridCacheVersion> committed, Collection<GridCacheVersion> rolledback); /** @@ -683,22 +631,6 @@ public interface IgniteInternalTx extends AutoCloseable { public boolean onePhaseCommit(); /** - * @return {@code True} if transaction has transform entries. This flag will be only set for local - * transactions. - */ - public boolean hasTransforms(); - - /** - * @return Public API proxy. - */ - public TransactionProxy proxy(); - - /** - * @param topVer New topology version. - */ - public void onRemap(AffinityTopologyVersion topVer); - - /** * @param e Commit error. */ public void commitError(Throwable e); http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 7c7b5a8..ddafbac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -22,6 +22,7 @@ import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.IgniteTransactionsEx; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.transactions.Transaction; @@ -91,7 +92,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { } /** {@inheritDoc} */ - @Override public IgniteInternalTx txStartEx( + @Override public GridNearTxLocal txStartEx( GridCacheContext ctx, TransactionConcurrency concurrency, TransactionIsolation isolation, @@ -113,7 +114,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { } /** {@inheritDoc} */ - @Override public IgniteInternalTx txStartEx( + @Override public GridNearTxLocal txStartEx( GridCacheContext ctx, TransactionConcurrency concurrency, TransactionIsolation isolation) @@ -141,7 +142,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { * @return Transaction. */ @SuppressWarnings("unchecked") - private IgniteInternalTx txStart0( + private GridNearTxLocal txStart0( TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, @@ -151,11 +152,12 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { cctx.kernalContext().gateway().readLock(); try { - IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx); + GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx); if (tx != null) throw new IllegalStateException("Failed to start new transaction " + "(current thread already has a transaction): " + tx); + tx = cctx.tm().newTx( false, false, @@ -178,7 +180,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { /** {@inheritDoc} */ @Nullable @Override public Transaction tx() { - IgniteInternalTx tx = cctx.tm().userTx(); + GridNearTxLocal tx = cctx.tm().userTx(); return tx != null ? tx.proxy() : null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index b07a117..13ca26a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -39,7 +39,6 @@ import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -91,7 +90,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; import static org.apache.ignite.transactions.TransactionState.ACTIVE; -import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARED; @@ -192,12 +190,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** */ protected boolean onePhaseCommit; - /** */ - protected CacheWriteSynchronizationMode syncMode; - - /** If this transaction contains transform entries. */ - protected boolean transform; - /** Commit version. */ private volatile GridCacheVersion commitVer; @@ -207,9 +199,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** Done marker. */ protected volatile boolean isDone; - /** Preparing flag (no need for volatile modifier). */ - private boolean preparing; - /** */ @GridToStringInclude private Map<Integer, Set<Integer>> invalidParts; @@ -251,10 +240,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** Store used flag. */ protected boolean storeEnabled = true; - /** */ - @GridToStringExclude - private TransactionProxyImpl proxy; - /** * Empty constructor required for {@link Externalizable}. */ @@ -420,8 +405,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean storeUsed() { - return storeEnabled() && txState().storeUsed(cctx); + @Override public boolean storeWriteThrough() { + return storeEnabled() && txState().storeWriteThrough(cctx); } /** @@ -512,32 +497,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } } - /** {@inheritDoc} */ - @Override public void onRemap(AffinityTopologyVersion topVer) { - assert false : this; - } - - /** {@inheritDoc} */ - @Override public boolean hasTransforms() { - return transform; - } - - /** {@inheritDoc} */ - @Override public boolean markPreparing() { - synchronized (this) { - if (preparing) - return false; - - preparing = true; - - return true; - } - } - /** * @return {@code True} if marked. */ - @Override public boolean markFinalizing(FinalizationStatus status) { + @Override public final boolean markFinalizing(FinalizationStatus status) { boolean res; switch (status) { @@ -546,15 +509,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement break; - case RECOVERY_WAIT: - FINALIZING_UPD.compareAndSet(this, FinalizationStatus.NONE, FinalizationStatus.RECOVERY_WAIT); - - FinalizationStatus cur = finalizing; - - res = cur == FinalizationStatus.RECOVERY_WAIT || cur == FinalizationStatus.RECOVERY_FINISH; - - break; - case RECOVERY_FINISH: FinalizationStatus old = finalizing; @@ -564,7 +518,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement default: throw new IllegalArgumentException("Cannot set finalization status: " + status); - } if (res) { @@ -639,26 +592,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean replicated() { - return false; - } - - /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode syncMode() { - if (syncMode != null) - return syncMode; - - return txState().syncMode(cctx); - } - - /** - * @param syncMode Write synchronization mode. - */ - public void syncMode(CacheWriteSynchronizationMode syncMode) { - this.syncMode = syncMode; - } - - /** {@inheritDoc} */ @Override public IgniteUuid xid() { return xidVer.asGridUuid(); } @@ -911,30 +844,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } } - /** - * - */ - @Override public void close() throws IgniteCheckedException { - TransactionState state = state(); - - if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) - rollback(); - - synchronized (this) { - try { - while (!done()) - wait(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - if (!done()) - throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + - this, e); - } - } - } - /** {@inheritDoc} */ @Override public boolean needsCompletedVersions() { return false; @@ -1190,12 +1099,12 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean isSystemInvalidate() { + @Override public final boolean isSystemInvalidate() { return sysInvalidate; } /** {@inheritDoc} */ - @Override public void systemInvalidate(boolean sysInvalidate) { + @Override public final void systemInvalidate(boolean sysInvalidate) { this.sysInvalidate = sysInvalidate; } @@ -1257,7 +1166,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement * @throws IgniteCheckedException If batch update failed. */ @SuppressWarnings({"CatchGenericClass"}) - protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { + protected final void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException { if (!storeEnabled() || internal() || (!local() && near())) // No need to work with local store at GridNearTxRemote. return; @@ -1806,14 +1715,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public TransactionProxy proxy() { - if (proxy == null) - proxy = new TransactionProxyImpl(this, cctx, false); - - return proxy; - } - - /** {@inheritDoc} */ @Override public boolean equals(Object o) { return o == this || (o instanceof IgniteTxAdapter && xidVer.equals(((IgniteTxAdapter)o).xidVer)); } @@ -1972,21 +1873,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void commit() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public void close() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ - @Override public void rollback() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ @Override public boolean activeCachesDeploymentEnabled() { return false; } @@ -2017,7 +1903,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean storeUsed() { + @Override public boolean storeWriteThrough() { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } @@ -2051,11 +1937,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void onRemap(AffinityTopologyVersion topVer) { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ @Override public void commitError(Throwable e) { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } @@ -2066,11 +1947,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean markPreparing() { - throw new IllegalStateException("Deserialized transaction can only be used as read-only."); - } - - /** {@inheritDoc} */ @Override public boolean markFinalizing(FinalizationStatus status) { throw new IllegalStateException("Deserialized transaction can only be used as read-only."); } @@ -2156,11 +2032,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean replicated() { - return false; - } - - /** {@inheritDoc} */ @Override public UUID subjectId() { return null; } @@ -2176,11 +2047,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode syncMode() { - return null; - } - - /** {@inheritDoc} */ @Override public boolean hasWriteKey(IgniteTxKey key) { return false; } @@ -2258,12 +2124,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public void prepare() throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() { + @Override public IgniteInternalFuture<?> salvageTx() { return null; } @@ -2393,16 +2254,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public boolean hasTransforms() { - return false; - } - - /** {@inheritDoc} */ - @Override public TransactionProxy proxy() { - return null; - } - - /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 14a7ed0..e1d12af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -423,7 +423,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** * @param val Value to set. */ - void setAndMarkValid(CacheObject val) { + public void setAndMarkValid(CacheObject val) { setAndMarkValid(op(), val, this.val.hasWriteValue(), this.val.hasReadValue()); } @@ -451,7 +451,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { * Marks this entry as value-has-bean-read. Effectively, makes values enlisted to transaction visible * to further peek operations. */ - void markValid() { + public void markValid() { prevVal.value(val.op(), val.value(), val.hasWriteValue(), val.hasReadValue()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 56a7fa2..be48de7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -255,7 +255,7 @@ public class IgniteTxHandler { req.last()); if (locTx.isRollbackOnly()) - locTx.rollbackAsync(); + locTx.rollbackNearTxLocalAsync(); return fut.chain(new C1<IgniteInternalFuture<GridNearTxPrepareResponse>, GridNearTxPrepareResponse>() { @Override public GridNearTxPrepareResponse apply(IgniteInternalFuture<GridNearTxPrepareResponse> f) { @@ -491,7 +491,7 @@ public class IgniteTxHandler { if (tx.isRollbackOnly() && !tx.commitOnPrepare()) { if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK) - tx.rollbackAsync(); + tx.rollbackDhtLocalAsync(); } final GridDhtTxLocal tx0 = tx; @@ -711,8 +711,10 @@ public class IgniteTxHandler { * @param req Request. * @return Future. */ - @Nullable public IgniteInternalFuture<IgniteInternalTx> finish(UUID nodeId, @Nullable GridNearTxLocal locTx, - GridNearTxFinishRequest req) { + @Nullable public IgniteInternalFuture<IgniteInternalTx> finish(UUID nodeId, + @Nullable GridNearTxLocal locTx, + GridNearTxFinishRequest req) + { assert nodeId != null; assert req != null; @@ -763,8 +765,10 @@ public class IgniteTxHandler { * @param req Finish request. * @return Finish future. */ - private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId, @Nullable GridNearTxLocal locTx, - GridNearTxFinishRequest req) { + private IgniteInternalFuture<IgniteInternalTx> finishDhtLocal(UUID nodeId, + @Nullable GridNearTxLocal locTx, + GridNearTxFinishRequest req) + { GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version()); GridDhtTxLocal tx = null; @@ -845,10 +849,11 @@ public class IgniteTxHandler { assert req.syncMode() != null : req; tx.syncMode(req.syncMode()); + tx.nearFinishFutureId(req.futureId()); + tx.nearFinishMiniId(req.miniId()); + tx.storeEnabled(req.storeEnabled()); if (req.commit()) { - tx.storeEnabled(req.storeEnabled()); - if (!tx.markFinalizing(USER_FINISH)) { if (log.isDebugEnabled()) log.debug("Will not finish transaction (it is handled by another thread): " + tx); @@ -856,10 +861,7 @@ public class IgniteTxHandler { return null; } - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - - IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitAsync(); + IgniteInternalFuture<IgniteInternalTx> commitFut = tx.commitDhtLocalAsync(); // Only for error logging. commitFut.listen(CU.errorLogger(log)); @@ -867,10 +869,7 @@ public class IgniteTxHandler { return commitFut; } else { - tx.nearFinishFutureId(req.futureId()); - tx.nearFinishMiniId(req.miniId()); - - IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackDhtLocalAsync(); // Only for error logging. rollbackFut.listen(CU.errorLogger(log)); @@ -887,7 +886,7 @@ public class IgniteTxHandler { IgniteInternalFuture<IgniteInternalTx> res; - IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); + IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackDhtLocalAsync(); // Only for error logging. rollbackFut.listen(CU.errorLogger(log)); @@ -928,7 +927,7 @@ public class IgniteTxHandler { throw e; if (tx != null) - return tx.rollbackAsync(); + return tx.rollbackNearTxLocalAsync(); return new GridFinishedFuture<>(e); } @@ -1011,7 +1010,7 @@ public class IgniteTxHandler { U.error(log, "Failed to process prepare request: " + req, e); if (nearTx != null) - nearTx.rollback(); + nearTx.rollbackRemoteTx(); res = new GridDhtTxPrepareResponse( req.partition(), @@ -1153,7 +1152,7 @@ public class IgniteTxHandler { if (completeFut != null) { completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { sendReply(nodeId, req, true, nearTxId); } }); @@ -1212,12 +1211,12 @@ public class IgniteTxHandler { tx.setPartitionUpdateCounters( req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null); - tx.commit(); + tx.commitRemoteTx(); } else { tx.doneRemote(req.baseVersion(), null, null, null); - tx.rollback(); + tx.rollbackRemoteTx(); } } catch (Throwable e) { @@ -1228,7 +1227,7 @@ public class IgniteTxHandler { tx.systemInvalidate(true); try { - tx.commit(); + tx.commitRemoteTx(); } catch (IgniteCheckedException ex) { U.error(log, "Failed to invalidate transaction: " + tx, ex); @@ -1255,7 +1254,7 @@ public class IgniteTxHandler { // Complete remote candidates. tx.doneRemote(req.version(), null, null, null); - tx.commit(); + tx.commitRemoteTx(); } catch (IgniteTxHeuristicCheckedException e) { // Just rethrow this exception. Transaction was already uncommitted. @@ -1268,7 +1267,7 @@ public class IgniteTxHandler { tx.invalidate(true); tx.systemInvalidate(true); - tx.rollback(); + tx.rollbackRemoteTx(); if (e instanceof Error) throw (Error)e; @@ -1282,7 +1281,7 @@ public class IgniteTxHandler { * @param dhtTx Dht tx. * @param nearTx Near tx. */ - protected void sendReply(UUID nodeId, + private void sendReply(UUID nodeId, GridDhtTxPrepareRequest req, GridDhtTxPrepareResponse res, GridDhtTxRemote dhtTx, @@ -1314,10 +1313,10 @@ public class IgniteTxHandler { } if (nearTx != null) - nearTx.rollback(); + nearTx.rollbackRemoteTx(); if (dhtTx != null) - dhtTx.rollback(); + dhtTx.rollbackRemoteTx(); } } @@ -1329,7 +1328,7 @@ public class IgniteTxHandler { * @param committed {@code True} if transaction committed on this node. * @param nearTxId Near tx version. */ - protected final void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) { + private void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed, GridCacheVersion nearTxId) { if (req.replyRequired() || req.checkCommitted()) { GridDhtTxFinishResponse res = new GridDhtTxFinishResponse( req.partition(), @@ -1551,21 +1550,19 @@ public class IgniteTxHandler { // Prepare prior to reordering, so the pending locks added // in prepare phase will get properly ordered as well. - tx.prepare(); + tx.prepareRemoteTx(); if (req.last()) { assert !F.isEmpty(req.transactionNodes()) : "Received last prepare request with empty transaction nodes: " + req; - tx.transactionNodes(req.transactionNodes()); - tx.state(PREPARED); } res.invalidPartitionsByCacheId(tx.invalidPartitions()); if (tx.empty() && req.last()) { - tx.rollback(); + tx.rollbackRemoteTx(); return null; } @@ -1644,7 +1641,7 @@ public class IgniteTxHandler { // Prepare prior to reordering, so the pending locks added // in prepare phase will get properly ordered as well. - tx.prepare(); + tx.prepareRemoteTx(); if (req.last()) tx.state(PREPARED); @@ -1659,7 +1656,7 @@ public class IgniteTxHandler { * @param nodeId Node ID. * @param req Request. */ - protected void processCheckPreparedTxRequest(final UUID nodeId, + private void processCheckPreparedTxRequest(final UUID nodeId, final GridCacheTxRecoveryRequest req) { if (txRecoveryMsgLog.isDebugEnabled()) { txRecoveryMsgLog.debug("Received tx recovery request [txId=" + req.nearXidVersion() + http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index bffb295..9417e1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -154,13 +154,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter { } /** {@inheritDoc} */ - @Override public boolean storeUsed(GridCacheSharedContext cctx) { + @Override public boolean storeWriteThrough(GridCacheSharedContext cctx) { if (cacheCtx == null) return false; CacheStoreManager store = cacheCtx.store(); - return store.configured(); + return store.configured() && store.isWriteThrough(); } /** {@inheritDoc} */
