ignite-2893 For datastructures use invoke instead of explicit txs, got rid of unnecessary outTx usage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee955df9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee955df9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee955df9 Branch: refs/heads/ignite-2.0 Commit: ee955df9fb80737292aac5f7ad3c82f8f0d8ea8e Parents: f440480 Author: sboikov <sboi...@gridgain.com> Authored: Thu Apr 20 13:10:28 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Apr 20 13:10:28 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 4 +- .../processors/cache/GridCacheUtils.java | 117 ++-- .../datastructures/DataStructuresProcessor.java | 61 +- .../datastructures/GridCacheAtomicLongImpl.java | 626 +++++++++++-------- .../GridCacheAtomicReferenceImpl.java | 276 ++++---- .../GridCacheAtomicSequenceImpl.java | 88 +-- .../GridCacheAtomicStampedImpl.java | 293 ++++----- .../GridCacheCountDownLatchImpl.java | 56 +- .../datastructures/GridCacheLockImpl.java | 80 +-- .../datastructures/GridCacheQueueProxy.java | 292 +-------- .../datastructures/GridCacheSemaphoreImpl.java | 56 +- .../datastructures/GridCacheSetProxy.java | 152 +---- .../GridTransactionalCacheQueueImpl.java | 8 +- 13 files changed, 812 insertions(+), 1297 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index a3d4c81..5438163 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2451,7 +2451,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { - @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx) + @Override public EntryProcessorResult<T> op(GridNearTxLocal tx) throws IgniteCheckedException { assert topVer == null || tx.implicit(); @@ -2489,7 +2489,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { - @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) + @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 5abb6de..df9c7c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -881,31 +881,6 @@ public class GridCacheUtils { } /** - * Method executes any Callable out of scope of transaction. - * If transaction started by this thread {@code cmd} will be executed in another thread. - * - * @param cmd Callable. - * @param ctx Cache context. - * @return T Callable result. - * @throws IgniteCheckedException If execution failed. - */ - public static <T> T outTx(Callable<T> cmd, GridCacheContext ctx) throws IgniteCheckedException { - if (ctx.tm().inUserTx()) - return ctx.closures().callLocalSafe(cmd, false).get(); - else { - try { - return cmd.call(); - } - catch (IgniteCheckedException | IgniteException | IllegalStateException e) { - throw e; - } - catch (Exception e) { - throw new IgniteCheckedException(e); - } - } - } - - /** * @param val Value. * @param skip Skip value flag. * @return Value. @@ -1604,56 +1579,58 @@ public class GridCacheUtils { /** * @param c Closure to retry. - * @param <S> Closure type. - * @return Wrapped closure. - */ - public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) { - return new Callable<S>() { - @Override public S call() throws Exception { - IgniteCheckedException err = null; - - for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) { - try { - return c.call(); - } - catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) { + * @throws IgniteCheckedException If failed. + * @return Closure result. + */ + public static <S> S retryTopologySafe(final Callable<S> c) throws IgniteCheckedException { + IgniteCheckedException err = null; + + for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) { + try { + return c.call(); + } + catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) { + throw e; + } + catch (TransactionRollbackException e) { + if (i + 1 == GridCacheAdapter.MAX_RETRIES) + throw e; + + U.sleep(1); + } + catch (IgniteCheckedException e) { + if (i + 1 == GridCacheAdapter.MAX_RETRIES) + throw e; + + if (X.hasCause(e, ClusterTopologyCheckedException.class)) { + ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); + + if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof + ClusterTopologyServerNotFoundException) throw e; - } - catch (TransactionRollbackException e) { - if (i + 1 == GridCacheAdapter.MAX_RETRIES) - throw e; + // IGNITE-1948: remove this check when the issue is fixed + if (topErr.retryReadyFuture() != null) + topErr.retryReadyFuture().get(); + else U.sleep(1); - } - catch (IgniteCheckedException e) { - if (i + 1 == GridCacheAdapter.MAX_RETRIES) - throw e; - - if (X.hasCause(e, ClusterTopologyCheckedException.class)) { - ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); - - if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof - ClusterTopologyServerNotFoundException) - throw e; - - // IGNITE-1948: remove this check when the issue is fixed - if (topErr.retryReadyFuture() != null) - topErr.retryReadyFuture().get(); - else - U.sleep(1); - } - else if (X.hasCause(e, IgniteTxRollbackCheckedException.class, - CachePartialUpdateCheckedException.class)) - U.sleep(1); - else - throw e; - } } - - // Should never happen. - throw err; + else if (X.hasCause(e, IgniteTxRollbackCheckedException.class, + CachePartialUpdateCheckedException.class)) + U.sleep(1); + else + throw e; } - }; + catch (RuntimeException e) { + throw e; + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + + // Should never happen. + throw err; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 102db96..0a439dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -476,7 +476,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen * @param name Sequence name. * @throws IgniteCheckedException If removing failed. */ - public final void removeSequence(final String name) throws IgniteCheckedException { + final void removeSequence(final String name) throws IgniteCheckedException { assert name != null; awaitInitialization(); @@ -488,9 +488,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen dsCacheCtx.gate().enter(); try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); - - removeInternal(key, GridCacheAtomicSequenceValue.class); + dsView.remove(new GridCacheInternalKeyImpl(name)); } finally { dsCacheCtx.gate().leave(); @@ -631,7 +629,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen * @param name Atomic long name. * @throws IgniteCheckedException If removing failed. */ - public final void removeAtomicLong(final String name) throws IgniteCheckedException { + final void removeAtomicLong(final String name) throws IgniteCheckedException { assert name != null; assert dsCacheCtx != null; @@ -642,7 +640,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen dsCacheCtx.gate().enter(); try { - removeInternal(new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class); + dsView.remove(new GridCacheInternalKeyImpl(name)); } finally { dsCacheCtx.gate().leave(); @@ -790,7 +788,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen * @param name Atomic reference name. * @throws IgniteCheckedException If removing failed. */ - public final void removeAtomicReference(final String name) throws IgniteCheckedException { + final void removeAtomicReference(final String name) throws IgniteCheckedException { assert name != null; assert dsCacheCtx != null; @@ -801,9 +799,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen dsCacheCtx.gate().enter(); try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); - - removeInternal(key, GridCacheAtomicReferenceValue.class); + dsView.remove(new GridCacheInternalKeyImpl(name)); } finally { dsCacheCtx.gate().leave(); @@ -894,7 +890,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen * @param name Atomic stamped name. * @throws IgniteCheckedException If removing failed. */ - public final void removeAtomicStamped(final String name) throws IgniteCheckedException { + final void removeAtomicStamped(final String name) throws IgniteCheckedException { assert name != null; assert dsCacheCtx != null; @@ -905,9 +901,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen dsCacheCtx.gate().enter(); try { - GridCacheInternal key = new GridCacheInternalKeyImpl(name); - - removeInternal(key, GridCacheAtomicStampedValue.class); + dsView.remove(new GridCacheInternalKeyImpl(name)); } finally { dsCacheCtx.gate().leave(); @@ -1516,43 +1510,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen } /** - * Remove internal entry by key from cache. - * - * @param key Internal entry key. - * @param cls Class of object which will be removed. If cached object has different type exception will be thrown. - * @return Method returns true if sequence has been removed and false if it's not cached. - * @throws IgniteCheckedException If removing failed or class of object is different to expected class. - */ - private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException { - return CU.outTx( - new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) { - // Check correctness type of removable object. - R val = cast(dsView.get(key), cls); - - if (val != null) { - dsView.remove(key); - - tx.commit(); - } - else - tx.setRollbackOnly(); - - return val != null; - } - catch (Error | Exception e) { - U.error(log, "Failed to remove data structure: " + key, e); - - throw e; - } - } - }, - dsCacheCtx - ); - } - - /** * */ static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> { @@ -1769,7 +1726,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen */ public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException { try { - return GridCacheUtils.retryTopologySafe(call).call(); + return GridCacheUtils.retryTopologySafe(call); } catch (IgniteCheckedException e) { throw e; http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index be718cf..3f07151 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -23,23 +23,20 @@ import java.io.InvalidObjectException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; -import java.util.concurrent.Callable; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.lang.IgniteBiTuple; -import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - /** * Cache atomic long implementation. */ @@ -55,9 +52,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign } }; - /** Logger. */ - private IgniteLogger log; - /** Atomic long name. */ private String name; @@ -76,126 +70,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign /** Cache context. */ private GridCacheContext ctx; - /** Callable for {@link #get()}. */ - private final Callable<Long> getCall = new Callable<Long>() { - @Override public Long call() throws Exception { - GridCacheAtomicLongValue val = atomicView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); - - return val.get(); - } - }; - - /** Callable for {@link #incrementAndGet()}. */ - private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); - - long retVal = val.get() + 1; - - val.set(retVal); - - atomicView.put(key, val); - - tx.commit(); - - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to increment and get: " + this, e); - - throw e; - } - } - }); - - /** Callable for {@link #getAndIncrement()}. */ - private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); - - long retVal = val.get(); - - val.set(retVal + 1); - - atomicView.put(key, val); - - tx.commit(); - - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to get and increment: " + this, e); - - throw e; - } - } - }); - - /** Callable for {@link #decrementAndGet()}. */ - private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); - - long retVal = val.get() - 1; - - val.set(retVal); - - atomicView.put(key, val); - - tx.commit(); - - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to decrement and get: " + this, e); - - throw e; - } - } - }); - - /** Callable for {@link #getAndDecrement()}. */ - private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); - - long retVal = val.get(); - - val.set(retVal - 1); - - atomicView.put(key, val); - - tx.commit(); - - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to get and decrement and get: " + this, e); - - throw e; - } - } - }); - /** * Empty constructor required by {@link Externalizable}. */ @@ -211,8 +85,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign * @param atomicView Atomic projection. * @param ctx CacheContext. */ - public GridCacheAtomicLongImpl(String name, GridCacheInternalKey key, - IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView, GridCacheContext ctx) { + public GridCacheAtomicLongImpl(String name, + GridCacheInternalKey key, + IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView, + GridCacheContext ctx) { assert key != null; assert atomicView != null; assert ctx != null; @@ -222,8 +98,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign this.key = key; this.atomicView = atomicView; this.name = name; - - log = ctx.logger(getClass()); } /** {@inheritDoc} */ @@ -236,7 +110,12 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(getCall, ctx); + GridCacheAtomicLongValue val = atomicView.get(key); + + if (val == null) + throw new IgniteException("Failed to find atomic long: " + name); + + return val.get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -248,7 +127,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try{ - return CU.outTx(incAndGetCall, ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, IncrementAndGetProcessor.INSTANCE); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -260,7 +146,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(getAndIncCall, ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndIncrementProcessor.INSTANCE); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -272,7 +165,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(internalAddAndGet(l), ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, new AddAndGetProcessor(l)); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -284,7 +184,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(internalGetAndAdd(l), ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndAddProcessor(l)); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -296,7 +203,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(decAndGetCall, ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, DecrementAndGetProcessor.INSTANCE); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -308,7 +222,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(getAndDecCall, ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndDecrementProcessor.INSTANCE); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -320,7 +241,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(internalGetAndSet(l), ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndSetProcessor(l)); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -332,7 +260,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(internalCompareAndSetAndGet(expVal, newVal) , ctx) == expVal; + EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal)); + + assert res != null && res.get() != null : res; + + return res.get() == expVal; + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -348,7 +283,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign checkRemoved(); try { - return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx); + EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal)); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -422,193 +364,335 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign } } + /** {@inheritDoc} */ + @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { + this.atomicView = kctx.cache().atomicsCache(); + this.ctx = atomicView.context(); + } + + /** {@inheritDoc} */ + @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(ctx.kernalContext()); + out.writeUTF(name); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); + + t.set1((GridKernalContext)in.readObject()); + t.set2(in.readUTF()); + } + /** - * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode. + * Reconstructs object on unmarshalling. * - * @param l Value will be added to atomic long. - * @return Callable for execution in async and sync mode. + * @return Reconstructed object. + * @throws ObjectStreamException Thrown in case of unmarshalling error. */ - private Callable<Long> internalAddAndGet(final long l) { - return retryTopologySafe(new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); + private Object readResolve() throws ObjectStreamException { + try { + IgniteBiTuple<GridKernalContext, String> t = stash.get(); - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + return t.get1().dataStructures().atomicLong(t.get2(), 0L, false); + } + catch (IgniteCheckedException e) { + throw U.withCause(new InvalidObjectException(e.getMessage()), e); + } + finally { + stash.remove(); + } + } - long retVal = val.get() + l; + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheAtomicLongImpl.class, this); + } - val.set(retVal); + /** + * + */ + static class GetAndSetProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final long newVal; + + /** + * @param newVal New value. + */ + GetAndSetProcessor(long newVal) { + this.newVal = newVal; + } - atomicView.put(key, val); + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); - tx.commit(); + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to add and get: " + this, e); + long curVal = val.get(); - throw e; - } - } - }); + e.setValue(new GridCacheAtomicLongValue(newVal)); + + return curVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GetAndSetProcessor.class, this); + } } /** - * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode. * - * @param l Value will be added to atomic long. - * @return Callable for execution in async and sync mode. */ - private Callable<Long> internalGetAndAdd(final long l) { - return retryTopologySafe(new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); - - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + static class GetAndAddProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final long delta; + + /** + * @param delta Delta. + */ + GetAndAddProcessor(long delta) { + this.delta = delta; + } - long retVal = val.get(); + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); - val.set(retVal + l); + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); - atomicView.put(key, val); + long curVal = val.get(); - tx.commit(); + e.setValue(new GridCacheAtomicLongValue(curVal + delta)); - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to get and add: " + this, e); + return curVal; + } - throw e; - } - } - }); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GetAndAddProcessor.class, this); + } } /** - * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode. * - * @param l Value will be added to atomic long. - * @return Callable for execution in async and sync mode. */ - private Callable<Long> internalGetAndSet(final long l) { - return new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); + static class AddAndGetProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final long delta; + + /** + * @param delta Delta. + */ + AddAndGetProcessor(long delta) { + this.delta = delta; + } - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); - long retVal = val.get(); + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); - val.set(l); + long newVal = val.get() + delta; - atomicView.put(key, val); + e.setValue(new GridCacheAtomicLongValue(newVal)); - tx.commit(); + return newVal; + } - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to get and set: " + this, e); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AddAndGetProcessor.class, this); + } + } - throw e; - } - } - }; + /** + * + */ + static class CompareAndSetProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final long expVal; + + /** */ + private final long newVal; + + /** + * @param expVal Expected value. + * @param newVal New value. + */ + CompareAndSetProcessor(long expVal, long newVal) { + this.expVal = expVal; + this.newVal = newVal; + } + + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); + + long curVal = val.get(); + + if (curVal == expVal) + e.setValue(new GridCacheAtomicLongValue(newVal)); + + return curVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CompareAndSetProcessor.class, this); + } } /** - * Method returns callable for execution {@link #compareAndSetAndGet(long, long)} - * operation in async and sync mode. * - * @param expVal Expected atomic long value. - * @param newVal New atomic long value. - * @return Callable for execution in async and sync mode. */ - private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) { - return new Callable<Long>() { - @Override public Long call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicLongValue val = atomicView.get(key); + static class GetAndIncrementProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; - if (val == null) - throw new IgniteCheckedException("Failed to find atomic long with given name: " + name); + /** */ + private static final GetAndIncrementProcessor INSTANCE = new GetAndIncrementProcessor(); - long retVal = val.get(); + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); - if (retVal == expVal) { - val.set(newVal); + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); - atomicView.getAndPut(key, val); + long ret = val.get(); - tx.commit(); - } + e.setValue(new GridCacheAtomicLongValue(ret + 1)); - return retVal; - } - catch (Error | Exception e) { - U.error(log, "Failed to compare and set: " + this, e); + return ret; + } - throw e; - } - } - }; + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GetAndIncrementProcessor.class, this); + } } - /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.atomicView = kctx.cache().atomicsCache(); - this.ctx = atomicView.context(); - } + /** + * + */ + static class IncrementAndGetProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; - /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + /** */ + private static final IncrementAndGetProcessor INSTANCE = new IncrementAndGetProcessor(); - } + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(ctx.kernalContext()); - out.writeUTF(name); - } + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - IgniteBiTuple<GridKernalContext, String> t = stash.get(); + long newVal = val.get() + 1; - t.set1((GridKernalContext)in.readObject()); - t.set2(in.readUTF()); + e.setValue(new GridCacheAtomicLongValue(newVal)); + + return newVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IncrementAndGetProcessor.class, this); + } } /** - * Reconstructs object on unmarshalling. * - * @return Reconstructed object. - * @throws ObjectStreamException Thrown in case of unmarshalling error. */ - private Object readResolve() throws ObjectStreamException { - try { - IgniteBiTuple<GridKernalContext, String> t = stash.get(); + static class GetAndDecrementProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; - return t.get1().dataStructures().atomicLong(t.get2(), 0L, false); - } - catch (IgniteCheckedException e) { - throw U.withCause(new InvalidObjectException(e.getMessage()), e); + /** */ + private static final GetAndDecrementProcessor INSTANCE = new GetAndDecrementProcessor(); + + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); + + long ret = val.get(); + + e.setValue(new GridCacheAtomicLongValue(ret - 1)); + + return ret; } - finally { - stash.remove(); + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GetAndDecrementProcessor.class, this); } } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheAtomicLongImpl.class, this); + /** + * + */ + static class DecrementAndGetProcessor implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final DecrementAndGetProcessor INSTANCE = new DecrementAndGetProcessor(); + + /** {@inheritDoc} */ + @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) { + GridCacheAtomicLongValue val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name()); + + long newVal = val.get() - 1; + + e.setValue(new GridCacheAtomicLongValue(newVal)); + + return newVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DecrementAndGetProcessor.class, this); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 4365468..b7dc007 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -23,24 +23,21 @@ import java.io.InvalidObjectException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; -import java.util.concurrent.Callable; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.lang.IgniteBiTuple; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - /** * Cache atomic reference implementation. */ @@ -56,9 +53,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } }; - /** Logger. */ - private IgniteLogger log; - /** Atomic reference name. */ private String name; @@ -77,18 +71,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef /** Cache context. */ private GridCacheContext ctx; - /** Callable for {@link #get} operation */ - private final Callable<T> getCall = new Callable<T>() { - @Override public T call() throws Exception { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); - - if (ref == null) - throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); - - return ref.get(); - } - }; - /** * Empty constructor required by {@link Externalizable}. */ @@ -117,8 +99,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef this.key = key; this.atomicView = atomicView; this.name = name; - - log = ctx.logger(getClass()); } /** {@inheritDoc} */ @@ -131,7 +111,12 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef checkRemoved(); try { - return CU.outTx(getCall, ctx); + GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); + + if (ref == null) + throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); + + return ref.get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -143,7 +128,10 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef checkRemoved(); try { - CU.outTx(internalSet(val), ctx); + atomicView.invoke(key, new ReferenceSetEntryProcessor<>(val)); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -152,20 +140,42 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef /** {@inheritDoc} */ @Override public boolean compareAndSet(T expVal, T newVal) { - return compareAndSetAndGet(newVal, expVal) == expVal; + try { + EntryProcessorResult<Boolean> res = + atomicView.invoke(key, new ReferenceCompareAndSetEntryProcessor<>(expVal, newVal)); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** * Compares current value with specified value for equality and, if they are equal, replaces current value. * * @param newVal New value to set. + * @param expVal Expected value. * @return Original value. */ public T compareAndSetAndGet(T newVal, T expVal) { checkRemoved(); try { - return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx); + EntryProcessorResult<T> res = + atomicView.invoke(key, new ReferenceCompareAndSetAndGetEntryProcessor<T>(expVal, newVal)); + + assert res != null; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -205,82 +215,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } } - /** - * Method returns callable for execution {@link #set(Object)} operation in async and sync mode. - * - * @param val Value will be set in reference . - * @return Callable for execution in async and sync mode. - */ - private Callable<Boolean> internalSet(final T val) { - return retryTopologySafe(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); - - if (ref == null) - throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); - - ref.set(val); - - atomicView.put(key, ref); - - tx.commit(); - - return true; - } - catch (Error | Exception e) { - U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e); - - throw e; - } - } - }); - } - - /** - * Conditionally sets the new value. It will be set if {@code expValPred} is - * evaluate to {@code true}. - * - * @param expVal Expected value. - * @param newVal New value. - * @return Callable for execution in async and sync mode. - */ - private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) { - return retryTopologySafe(new Callable<T>() { - @Override public T call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); - - if (ref == null) - throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); - - T origVal = ref.get(); - - if (!F.eq(expVal, origVal)) { - tx.setRollbackOnly(); - - return origVal; - } - else { - ref.set(newVal); - - atomicView.getAndPut(key, ref); - - tx.commit(); - - return expVal; - } - } - catch (Error | Exception e) { - U.error(log, "Failed to compare and value [expVal=" + expVal + ", newVal" + - newVal + ", atomicReference" + this + ']', e); - - throw e; - } - } - }); - } - /** {@inheritDoc} */ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { this.atomicView = kctx.cache().atomicsCache(); @@ -289,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef /** {@inheritDoc} */ @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { - + // No-op. } /** @@ -363,6 +297,136 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } } + /** + * + */ + static class ReferenceSetEntryProcessor<T> implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final T newVal; + + /** + * @param newVal New value. + */ + ReferenceSetEntryProcessor(T newVal) { + this.newVal = newVal; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e, + Object... args) { + GridCacheAtomicReferenceValue val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name()); + + e.setValue(new GridCacheAtomicReferenceValue<>(newVal)); + + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReferenceSetEntryProcessor.class, this); + } + } + + /** + * + */ + static class ReferenceCompareAndSetEntryProcessor<T> implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final T expVal; + + /** */ + private final T newVal; + + /** + * @param expVal Expected value. + * @param newVal New value. + */ + ReferenceCompareAndSetEntryProcessor(T expVal, T newVal) { + this.expVal = expVal; + this.newVal = newVal; + } + + /** {@inheritDoc} */ + @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e, + Object... args) { + GridCacheAtomicReferenceValue<T> val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name()); + + T curVal = val.get(); + + if (F.eq(expVal, curVal)) { + e.setValue(new GridCacheAtomicReferenceValue<T>(newVal)); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReferenceCompareAndSetEntryProcessor.class, this); + } + } + + /** + * + */ + static class ReferenceCompareAndSetAndGetEntryProcessor<T> implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, T> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final T expVal; + + /** */ + private final T newVal; + + /** + * @param expVal Expected value. + * @param newVal New value. + */ + ReferenceCompareAndSetAndGetEntryProcessor(T expVal, T newVal) { + this.expVal = expVal; + this.newVal = newVal; + } + + /** {@inheritDoc} */ + @Override public T process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e, + Object... args) { + GridCacheAtomicReferenceValue<T> val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name()); + + T curVal = val.get(); + + if (F.eq(expVal, curVal)) + e.setValue(new GridCacheAtomicReferenceValue<T>(newVal)); + + return curVal; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ReferenceCompareAndSetAndGetEntryProcessor.class, this); + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheAtomicReferenceImpl.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 0661b11..d14bb47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -32,11 +32,9 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -256,7 +254,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc if (updateGuard.compareAndSet(false, true)) { try { try { - return updateCall.call(); + return retryTopologySafe(updateCall); } catch (IgniteCheckedException | IgniteException | IllegalStateException e) { throw e; @@ -303,86 +301,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } } - /** - * Asynchronous sequence update operation. Will add given amount to the sequence value. - * - * @param l Increment amount. - * @param updateCall Cache call that will update sequence reservation count in accordance with l. - * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value - * prior to update. - * @return Future indicating sequence value. - * @throws IgniteCheckedException If update failed. - */ - @SuppressWarnings("SignalWithoutCorrespondingAwait") - private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated) - throws IgniteCheckedException { - checkRemoved(); - - A.ensure(l > 0, " Parameter mustn't be less then 1: " + l); - - lock.lock(); - - try { - // If reserved range isn't exhausted. - if (locVal + l <= upBound) { - long curVal = locVal; - - locVal += l; - - return new GridFinishedFuture<>(updated ? locVal : curVal); - } - } - finally { - lock.unlock(); - } - - if (updateCall == null) - updateCall = internalUpdate(l, updated); - - while (true) { - if (updateGuard.compareAndSet(false, true)) { - try { - // This call must be outside lock. - return ctx.closures().callLocalSafe(updateCall, true); - } - finally { - lock.lock(); - - try { - updateGuard.set(false); - - cond.signalAll(); - } - finally { - lock.unlock(); - } - } - } - else { - lock.lock(); - - try { - while (locVal >= upBound && updateGuard.get()) - U.await(cond, 500, MILLISECONDS); - - checkRemoved(); - - // If reserved range isn't exhausted. - if (locVal + l <= upBound) { - long curVal = locVal; - - locVal += l; - - return new GridFinishedFuture<>(updated ? locVal : curVal); - } - } - finally { - lock.unlock(); - } - } - } - } - /** Get local batch size for this sequences. * * @return Sequence batch size. @@ -485,7 +403,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc */ @SuppressWarnings("TooBroadScope") private Callable<Long> internalUpdate(final long l, final boolean updated) { - return retryTopologySafe(new Callable<Long>() { + return new Callable<Long>() { @Override public Long call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicSequenceValue seq = seqView.get(key); @@ -556,7 +474,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc throw e; } } - }); + }; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index 09cea43..3f14942 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -23,25 +23,20 @@ import java.io.InvalidObjectException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; -import java.util.concurrent.Callable; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.EntryProcessorResult; +import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; -import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgnitePredicate; - -import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Cache atomic stamped implementation. @@ -58,9 +53,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt } }; - /** Logger. */ - private IgniteLogger log; - /** Atomic stamped name. */ private String name; @@ -79,42 +71,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt /** Cache context. */ private GridCacheContext ctx; - /** Callable for {@link #get()} operation */ - private final Callable<IgniteBiTuple<T, S>> getCall = retryTopologySafe(new Callable<IgniteBiTuple<T, S>>() { - @Override public IgniteBiTuple<T, S> call() throws Exception { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); - - if (stmp == null) - throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); - - return stmp.get(); - } - }); - - /** Callable for {@link #value()} operation */ - private final Callable<T> valCall = retryTopologySafe(new Callable<T>() { - @Override public T call() throws Exception { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); - - if (stmp == null) - throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); - - return stmp.value(); - } - }); - - /** Callable for {@link #stamp()} operation */ - private final Callable<S> stampCall = retryTopologySafe(new Callable<S>() { - @Override public S call() throws Exception { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); - - if (stmp == null) - throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); - - return stmp.stamp(); - } - }); - /** * Empty constructor required by {@link Externalizable}. */ @@ -130,8 +86,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt * @param atomicView Atomic projection. * @param ctx Cache context. */ - public GridCacheAtomicStampedImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey, - GridCacheAtomicStampedValue<T, S>> atomicView, GridCacheContext ctx) { + public GridCacheAtomicStampedImpl(String name, + GridCacheInternalKey key, + IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView, + GridCacheContext ctx) { assert key != null; assert atomicView != null; assert ctx != null; @@ -141,8 +99,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt this.key = key; this.atomicView = atomicView; this.name = name; - - log = ctx.logger(getClass()); } /** {@inheritDoc} */ @@ -155,7 +111,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - return CU.outTx(getCall, ctx); + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + return stmp.get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -167,7 +128,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - CU.outTx(internalSet(val, stamp), ctx); + atomicView.invoke(key, new StampedSetEntryProcessor<>(val, stamp)); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -179,8 +143,15 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - return CU.outTx(internalCompareAndSet(F0.equalTo(expVal), wrapperClosure(newVal), - F0.equalTo(expStamp), wrapperClosure(newStamp)), ctx); + EntryProcessorResult<Boolean> res = + atomicView.invoke(key, new StampedCompareAndSetEntryProcessor<>(expVal, expStamp, newVal, newStamp)); + + assert res != null && res.get() != null : res; + + return res.get(); + } + catch (EntryProcessorException e) { + throw new IgniteException(e.getMessage(), e); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -192,7 +163,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - return CU.outTx(stampCall, ctx); + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + return stmp.stamp(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -204,7 +180,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt checkRemoved(); try { - return CU.outTx(valCall, ctx); + GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); + + if (stmp == null) + throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); + + return stmp.value(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -244,100 +225,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt } } - /** - * Method make wrapper closure for existing value. - * - * @param val Value. - * @return Closure. - */ - private <N> IgniteClosure<N, N> wrapperClosure(final N val) { - return new IgniteClosure<N, N>() { - @Override public N apply(N e) { - return val; - } - }; - } - - /** - * Method returns callable for execution {@link #set(Object,Object)}} operation in async and sync mode. - * - * @param val Value will be set in the atomic stamped. - * @param stamp Stamp will be set in the atomic stamped. - * @return Callable for execution in async and sync mode. - */ - private Callable<Boolean> internalSet(final T val, final S stamp) { - return retryTopologySafe(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); - - if (stmp == null) - throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); - - stmp.set(val, stamp); - - atomicView.put(key, stmp); - - tx.commit(); - - return true; - } - catch (Error | Exception e) { - U.error(log, "Failed to set [val=" + val + ", stamp=" + stamp + ", atomicStamped=" + this + ']', e); - - throw e; - } - } - }); - } - - /** - * Conditionally asynchronously sets the new value and new stamp. They will be set if - * {@code expValPred} and {@code expStampPred} both evaluate to {@code true}. - * - * @param expValPred Predicate which should evaluate to {@code true} for value to be set - * @param newValClos Closure generates new value. - * @param expStampPred Predicate which should evaluate to {@code true} for value to be set - * @param newStampClos Closure generates new stamp value. - * @return Callable for execution in async and sync mode. - */ - private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred, - final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred, - final IgniteClosure<S, S> newStampClos) { - return retryTopologySafe(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key); - - if (stmp == null) - throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name); - - if (!(expValPred.apply(stmp.value()) && expStampPred.apply(stmp.stamp()))) { - tx.setRollbackOnly(); - - return false; - } - else { - stmp.set(newValClos.apply(stmp.value()), newStampClos.apply(stmp.stamp())); - - atomicView.getAndPut(key, stmp); - - tx.commit(); - - return true; - } - } - catch (Error | Exception e) { - U.error(log, "Failed to compare and set [expValPred=" + expValPred + ", newValClos=" + - newValClos + ", expStampPred=" + expStampPred + ", newStampClos=" + newStampClos + - ", atomicStamped=" + this + ']', e); - - throw e; - } - } - }); - } - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx.kernalContext()); @@ -418,6 +305,104 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt return new IllegalStateException("Atomic stamped was removed from cache: " + name); } + /** + * + */ + static class StampedSetEntryProcessor<T, S> implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final T newVal; + + /** */ + private final S newStamp; + + /** + * @param newVal New value. + * @param newStamp New stamp value. + */ + StampedSetEntryProcessor(T newVal, S newStamp) { + this.newVal = newVal; + this.newStamp = newStamp; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e, + Object... args) { + GridCacheAtomicStampedValue val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name()); + + e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp)); + + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(StampedSetEntryProcessor.class, this); + } + } + + /** + * + */ + static class StampedCompareAndSetEntryProcessor<T, S> implements + CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Boolean> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final T expVal; + + /** */ + private final S expStamp; + + /** */ + private final T newVal; + + /** */ + private final S newStamp; + + /** + * @param expVal Expected value. + * @param expStamp Expected stamp. + * @param newVal New value. + * @param newStamp New stamp value. + */ + StampedCompareAndSetEntryProcessor(T expVal, S expStamp, T newVal, S newStamp) { + this.expVal = expVal; + this.expStamp = expStamp; + this.newVal = newVal; + this.newStamp = newStamp; + } + + /** {@inheritDoc} */ + @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e, + Object... args) { + GridCacheAtomicStampedValue val = e.getValue(); + + if (val == null) + throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name()); + + if (F.eq(expVal, val.value()) && F.eq(expStamp, val.stamp())) { + e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp)); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return GridToStringBuilder.toString(StampedCompareAndSetEntryProcessor.class, this); + } + } + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridCacheAtomicStampedImpl.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index ea80cc5..86e99a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -152,7 +152,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public int count() { try { - return CU.outTx(new GetCountCallable(), ctx); + GridCacheCountDownLatchValue latchVal = latchView.get(key); + + return latchVal == null ? 0 : latchVal.get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -208,7 +210,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc A.ensure(val > 0, "val should be positive"); try { - return CU.outTx(retryTopologySafe(new CountDownCallable(val)), ctx); + return retryTopologySafe(new CountDownCallable(val)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -218,7 +220,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc}*/ @Override public void countDownAll() { try { - CU.outTx(retryTopologySafe(new CountDownCallable(0)), ctx); + retryTopologySafe(new CountDownCallable(0)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -255,23 +257,22 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc int state = initGuard.get(); if (state != READY_LATCH_STATE) { - /** Internal latch is not fully initialized yet. Remember latest latch value. */ + /* Internal latch is not fully initialized yet. Remember latest latch value. */ lastLatchVal = cnt; return; } - /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */ + /* 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */ latch0 = internalLatch; } - /** Internal latch is fully initialized and ready for the usage. */ + /* Internal latch is fully initialized and ready for the usage. */ assert latch0 != null; while (latch0.getCount() > cnt) latch0.countDown(); - } /** @@ -280,27 +281,24 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc private void initializeLatch() throws IgniteCheckedException { if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) { try { - internalLatch = CU.outTx( - retryTopologySafe(new Callable<CountDownLatch>() { - @Override public CountDownLatch call() throws Exception { - try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { - GridCacheCountDownLatchValue val = latchView.get(key); + internalLatch = retryTopologySafe(new Callable<CountDownLatch>() { + @Override public CountDownLatch call() throws Exception { + try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheCountDownLatchValue val = latchView.get(key); - if (val == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find count down latch with given name: " + name); + if (val == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find count down latch with given name: " + name); - return new CountDownLatch(0); - } + return new CountDownLatch(0); + } - tx.commit(); + tx.commit(); - return new CountDownLatch(val.get()); - } + return new CountDownLatch(val.get()); } - }), - ctx - ); + } + }); synchronized (initGuard) { if (lastLatchVal != null) { @@ -392,18 +390,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** * */ - private class GetCountCallable implements Callable<Integer> { - /** {@inheritDoc} */ - @Override public Integer call() throws Exception { - GridCacheCountDownLatchValue latchVal = latchView.get(key); - - return latchVal == null ? 0 : latchVal.get(); - } - } - - /** - * - */ private class CountDownCallable implements Callable<Integer> { /** Value to count down on (if 0 then latch is counted down to 0). */ private final int val;