http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 936e9cc..ff4ba6d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -501,6 +501,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Override public IgniteInternalCache<K, V> withNoRetries() { + CacheOperationContext opCtx = new CacheOperationContext(false, null, false, null, true, null); + + return new GridCacheProxyImpl<>(ctx, this, opCtx); + } + + /** {@inheritDoc} */ @Override public CacheConfiguration configuration() { return ctx.config(); } @@ -2055,8 +2062,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } else { return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) { - @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx) { - return tx.getAllAsync(ctx, keys, deserializeBinary, skipVals, false, !readThrough, needVer); + @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.getAllAsync(ctx, + readyTopVer, + keys, + deserializeBinary, + skipVals, + false, + !readThrough, + needVer); } }, ctx.operationContextPerCall()); } @@ -2088,7 +2102,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V V prevVal = syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAsync(ctx, key, val, true, filter).get().value(); + return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value(); } @Override public String toString() { @@ -2140,8 +2154,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAsync(ctx, key, val, true, filter) + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.putAsync(ctx, readyTopVer, key, val, true, filter) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -2178,7 +2192,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Boolean stored = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAsync(ctx, key, val, false, filter).get().success(); + return tx.putAsync(ctx, null, key, val, false, filter).get().success(); } @Override public String toString() { @@ -2219,8 +2233,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); - return asyncOp(new AsyncInOp(drMap.keySet()) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + return asyncOp(new AsyncOp(drMap.keySet()) { + @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.putAllDrAsync(ctx, drMap); } @@ -2273,6 +2287,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V tx.topologyVersion(topVer); IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, + null, key, (EntryProcessor<K, V, Object>)entryProcessor, args); @@ -2311,7 +2326,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); - IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, invokeMap, args); + IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, null, invokeMap, args); Map<K, EntryProcessorResult<T>> res = fut.get().value(); @@ -2331,12 +2346,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKey(key); - IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() { - @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { + IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() { + @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); - return tx.invokeAsync(ctx, invokeMap, args); + return tx.invokeAsync(ctx, readyTopVer, invokeMap, args); } @Override public String toString() { @@ -2374,15 +2389,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(keys); - IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) { - @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { + IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) { + @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { @Override public EntryProcessor apply(K k) { return entryProcessor; } }); - return tx.invokeAsync(ctx, invokeMap, args); + return tx.invokeAsync(ctx, readyTopVer, invokeMap, args); } @Override public String toString() { @@ -2414,9 +2429,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(map.keySet()); - IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(map.keySet()) { - @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { - return tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); + IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) { + @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.invokeAsync(ctx, + readyTopVer, + (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, + args); } @Override public String toString() { @@ -2451,7 +2469,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { IgniteInternalFuture<GridCacheReturn> fut = - tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); + tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); return fut.get().value(); } @@ -2496,8 +2514,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAsync(ctx, key, val, false, filter).chain( + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.putAsync(ctx, + readyTopVer, + key, + val, + false, + filter).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @@ -2522,7 +2545,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAsync(ctx, key, val, true, ctx.noVal()).get().value(); + return (V)tx.putAsync(ctx, null, key, val, true, ctx.noVal()).get().value(); } @Override public String toString() { @@ -2543,8 +2566,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAsync(ctx, key, val, true, ctx.noVal()) + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.noVal()) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -2572,7 +2595,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Boolean stored = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAsync(ctx, key, val, false, ctx.noVal()).get().success(); + return tx.putAsync(ctx, null, key, val, false, ctx.noVal()).get().success(); } @Override public String toString() { @@ -2598,8 +2621,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAsync(ctx, key, val, false, ctx.noVal()).chain( + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.putAsync(ctx, + readyTopVer, + key, + val, + false, + ctx.noVal()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @@ -2623,7 +2651,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAsync(ctx, key, val, true, ctx.hasVal()).get().value(); + return (V)tx.putAsync(ctx, null, key, val, true, ctx.hasVal()).get().value(); } @Override public String toString() { @@ -2644,8 +2672,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAsync(ctx, key, val, true, ctx.hasVal()).chain( + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.putAsync(ctx, readyTopVer, key, val, true, ctx.hasVal()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -2669,7 +2697,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAsync(ctx, key, val, false, ctx.hasVal()).get().success(); + return tx.putAsync(ctx, null, key, val, false, ctx.hasVal()).get().success(); } @Override public String toString() { @@ -2686,8 +2714,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAsync(ctx, key, val, false, ctx.hasVal()).chain( + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.putAsync(ctx, readyTopVer, key, val, false, ctx.hasVal()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } @@ -2710,7 +2738,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAsync(ctx, key, newVal, false, ctx.equalsVal(oldVal)).get().success(); + return tx.putAsync(ctx, null, key, newVal, false, ctx.equalsVal(oldVal)).get() + .success(); } @Override public String toString() { @@ -2731,7 +2760,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { // Register before hiding in the filter. if (ctx.deploymentEnabled()) { try { @@ -2742,7 +2771,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } - return tx.putAsync(ctx, key, newVal, false, ctx.equalsVal(oldVal)).chain( + return tx.putAsync(ctx, readyTopVer, key, newVal, false, ctx.equalsVal(oldVal)).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @@ -2771,7 +2800,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V syncOp(new SyncInOp(m.size() == 1) { @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - tx.putAllAsync(ctx, m, false).get(); + tx.putAllAsync(ctx, null, m, false).get(); } @Override public String toString() { @@ -2791,9 +2820,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(m.keySet()); - return asyncOp(new AsyncInOp(m.keySet()) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, m, false).chain(RET2NULL); + return asyncOp(new AsyncOp(m.keySet()) { + @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.putAllAsync(ctx, + readyTopVer, + m, + false).chain(RET2NULL); } @Override public String toString() { @@ -2816,6 +2848,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V V prevVal = syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { V ret = tx.removeAllAsync(ctx, + null, Collections.singletonList(key), /*retval*/true, null, @@ -2850,9 +2883,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { // TODO should we invoke interceptor here? return tx.removeAllAsync(ctx, + readyTopVer, Collections.singletonList(key), /*retval*/true, null, @@ -2903,6 +2937,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V syncOp(new SyncInOp(keys.size() == 1) { @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.removeAllAsync(ctx, + null, keys, /*retval*/false, null, @@ -2930,9 +2965,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (keyCheck) validateCacheKeys(keys); - IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + IgniteInternalFuture<Object> fut = asyncOp(new AsyncOp(keys) { + @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.removeAllAsync(ctx, + readyTopVer, keys, /*retval*/false, null, @@ -2964,6 +3000,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V boolean rmv = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.removeAllAsync(ctx, + null, Collections.singletonList(key), /*retval*/false, null, @@ -3004,8 +3041,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { return tx.removeAllAsync(ctx, + readyTopVer, Collections.singletonList(key), /*retval*/false, filter, @@ -3034,7 +3072,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V syncOp(new SyncInOp(false) { @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - tx.removeAllDrAsync(ctx, drMap).get(); + tx.removeAllDrAsync(ctx, (Map)drMap).get(); } @Override public String toString() { @@ -3051,9 +3089,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); - return asyncOp(new AsyncInOp(drMap.keySet()) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { - return tx.removeAllDrAsync(ctx, drMap); + return asyncOp(new AsyncOp(drMap.keySet()) { + @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + return tx.removeAllDrAsync(ctx, (Map)drMap); } @Override public String toString() { @@ -3080,6 +3118,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.deploy().registerClass(val); return tx.removeAllAsync(ctx, + null, Collections.singletonList(key), /*retval*/false, ctx.equalsVal(val), @@ -3109,7 +3148,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { // Register before hiding in the filter. if (ctx.deploymentEnabled()) { try { @@ -3121,6 +3160,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } return tx.removeAllAsync(ctx, + readyTopVer, Collections.singletonList(key), /*retval*/false, ctx.equalsVal(val), @@ -3280,7 +3320,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - GridCacheEntryEx e = entry0(cacheKey, new AffinityTopologyVersion(ctx.discovery().topologyVersion()), + GridCacheEntryEx e = entry0(cacheKey, ctx.discovery().topologyVersionEx(), false, false); if (e == null) @@ -4257,31 +4297,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>( new IgniteCheckedException("Operation has been cancelled (node is stopping).")); - ctx.operationContextPerCall(opCtx); - - try { - return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() { - @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException { - try { - return tFut.get(); - } - catch (IgniteTxRollbackCheckedException e) { - throw e; - } - catch (IgniteCheckedException e1) { - tx0.rollbackAsync(); + return op.op(tx0, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() { + @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException { + try { + return tFut.get(); + } + catch (IgniteTxRollbackCheckedException e) { + throw e; + } + catch (IgniteCheckedException e1) { + tx0.rollbackAsync(); - throw e1; - } - finally { - ctx.shared().txContextReset(); - } + throw e1; } - }); - } - finally { - ctx.operationContextPerCall(null); - } + finally { + ctx.shared().txContextReset(); + } + } + }); } }); @@ -4290,7 +4323,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return f; } - final IgniteInternalFuture<T> f = op.op(tx).chain(new CX1<IgniteInternalFuture<T>, T>() { + final IgniteInternalFuture<T> f = op.op(tx, opCtx).chain(new CX1<IgniteInternalFuture<T>, T>() { @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException { try { return tFut.get(); @@ -5117,40 +5150,95 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param tx Transaction. - * @return Operation return value. + * @param readyTopVer Ready topology version. + * @return Operation future. */ - public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx); - } + public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer); - /** - * Cache operation. - */ - private abstract class AsyncInOp extends AsyncOp<Object> { /** - * + * @param tx Transaction. + * @param opCtx Operation context. + * @return Operation future. */ - protected AsyncInOp() { - super(); + public IgniteInternalFuture<T> op(final IgniteTxLocalAdapter tx, CacheOperationContext opCtx) { + AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot(); + + if (txTopVer != null) + return op(tx, (AffinityTopologyVersion)null); + + // Tx needs affinity for entry creation, wait when affinity is ready to avoid blocking inside async operation. + final AffinityTopologyVersion topVer = ctx.shared().exchange().topologyVersion(); + + IgniteInternalFuture<?> topFut = ctx.shared().exchange().affinityReadyFuture(topVer); + + if (topFut == null || topFut.isDone()) + return op(tx, topVer); + else + return waitTopologyFuture(topFut, topVer, tx, opCtx); } /** - * @param keys Keys involved. + * @param topFut Topology future. + * @param topVer Topology version to use. + * @param tx Transaction. + * @param opCtx Operation context. + * @return Operation future. */ - protected AsyncInOp(Collection<?> keys) { - super(keys); - } + protected IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut, + final AffinityTopologyVersion topVer, + final IgniteTxLocalAdapter tx, + final CacheOperationContext opCtx) { + final GridFutureAdapter fut0 = new GridFutureAdapter(); + + topFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> topFut) { + try { + topFut.get(); - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public final IgniteInternalFuture<Object> op(IgniteTxLocalAdapter tx) { - return (IgniteInternalFuture<Object>)inOp(tx); + IgniteInternalFuture<?> opFut = runOp(tx, topVer, opCtx); + + opFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> opFut) { + try { + fut0.onDone(opFut.get()); + } + catch (IgniteCheckedException e) { + fut0.onDone(e); + } + } + }); + } + catch (IgniteCheckedException e) { + fut0.onDone(e); + } + } + }); + + return fut0; } /** * @param tx Transaction. - * @return Operation return value. + * @param topVer Ready topology version. + * @param opCtx Operation context. + * @return Future. */ - public abstract IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx); + private IgniteInternalFuture<T> runOp(IgniteTxLocalAdapter tx, + AffinityTopologyVersion topVer, + CacheOperationContext opCtx) { + ctx.operationContextPerCall(opCtx); + + ctx.shared().txContextReset(); + + try { + return op(tx, topVer); + } + finally { + ctx.shared().txContextReset(); + + ctx.operationContextPerCall(null); + } + } } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 21975da..e264043 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -19,8 +19,10 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -43,7 +45,7 @@ import java.util.UUID; */ public class GridCacheAffinityManager extends GridCacheManagerAdapter { /** */ - private static final AffinityTopologyVersion TOP_FIRST = new AffinityTopologyVersion(1); + private static final AffinityTopologyVersion LOC_CACHE_TOP_VER = new AffinityTopologyVersion(1); /** */ public static final String FAILED_TO_FIND_CACHE_ERR_MSG = "Failed to find cache (cache was not started " + @@ -52,25 +54,45 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { /** Affinity cached function. */ private GridAffinityAssignmentCache aff; + /** */ + private AffinityFunction affFunction; + + /** */ + private AffinityKeyMapper affMapper; + /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - aff = new GridAffinityAssignmentCache(cctx, cctx.namex(), cctx.config().getAffinity(), - cctx.config().getAffinityMapper(), cctx.config().getBackups()); + affFunction = cctx.config().getAffinity(); + affMapper = cctx.config().getAffinityMapper(); + + aff = new GridAffinityAssignmentCache(cctx.kernalContext(), + cctx.namex(), + affFunction, + cctx.config().getNodeFilter(), + cctx.config().getBackups(), + cctx.isLocal()); } /** {@inheritDoc} */ @Override protected void onKernalStart0() throws IgniteCheckedException { if (cctx.isLocal()) // No discovery event needed for local affinity. - aff.calculate(TOP_FIRST, null); + aff.calculate(LOC_CACHE_TOP_VER, null); } /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { + cancelFutures(); + } + + /** + * + */ + public void cancelFutures() { IgniteCheckedException err = new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); - aff.onKernalStop(err); + aff.cancelFutures(err); } /** {@inheritDoc} */ @@ -78,7 +100,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, "Failed to wait for topology update, client disconnected."); - aff.onKernalStop(err); + aff.cancelFutures(err); } /** @@ -144,52 +166,23 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** - * Initializes affinity for joined node. - * - * @param topVer Topology version. - * @param affAssignment Affinity assignment for this topology version. - */ - public void initializeAffinity(AffinityTopologyVersion topVer, List<List<ClusterNode>> affAssignment) { - assert !cctx.isLocal(); - - aff.initialize(topVer, affAssignment); - } - - /** * @param topVer Topology version. * @return Affinity assignments. */ public List<List<ClusterNode>> assignments(AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = new AffinityTopologyVersion(1); + topVer = LOC_CACHE_TOP_VER; return aff.assignments(topVer); } /** - * Calculates affinity cache for given topology version. - * - * @param topVer Topology version to calculate affinity for. - * @param discoEvt Discovery event that causes this topology change. - * @return Affinity assignments. + * @return Assignment. */ - public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) { + public List<List<ClusterNode>> idealAssignment() { assert !cctx.isLocal(); - return aff.calculate(topVer, discoEvt); - } - - /** - * Copies previous affinity assignment when discovery event does not cause affinity assignment changes - * (e.g. client node joins on leaves). - * - * @param evt Event. - * @param topVer Topology version. - */ - public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) { - assert !cctx.isLocal(); - - aff.clientEventTopologyChange(evt, topVer); + return aff.idealAssignment(); } /** @@ -218,7 +211,21 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { if (aff0 == null) throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); - return aff0.partition(key); + return affFunction.partition(affinityKey(key)); + } + + /** + * If Key is {@link GridCacheInternal GridCacheInternal} entry when won't passed into user's mapper and + * will use {@link GridCacheDefaultAffinityKeyMapper default}. + * + * @param key Key. + * @return Affinity key. + */ + private Object affinityKey(Object key) { + if (key instanceof CacheObject && !(key instanceof BinaryObject)) + key = ((CacheObject)key).value(cctx.cacheObjectContext(), false); + + return (key instanceof GridCacheInternal ? cctx.defaultAffMapper() : affMapper).affinityKey(key); } /** @@ -237,7 +244,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { */ public List<ClusterNode> nodes(int part, AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = new AffinityTopologyVersion(1); + topVer = LOC_CACHE_TOP_VER; GridAffinityAssignmentCache aff0 = aff; @@ -250,12 +257,12 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { /** * Get affinity assignment for the given topology version. * - * @param topVer Toplogy version. - * @return Affinity affignment. + * @param topVer Topology version. + * @return Affinity assignment. */ public GridAffinityAssignment assignment(AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = new AffinityTopologyVersion(1); + topVer = LOC_CACHE_TOP_VER; GridAffinityAssignmentCache aff0 = aff; @@ -401,7 +408,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { */ public Set<Integer> primaryPartitions(UUID nodeId, AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = new AffinityTopologyVersion(1); + topVer = LOC_CACHE_TOP_VER; GridAffinityAssignmentCache aff0 = aff; @@ -418,7 +425,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { */ public Set<Integer> backupPartitions(UUID nodeId, AffinityTopologyVersion topVer) { if (cctx.isLocal()) - topVer = new AffinityTopologyVersion(1); + topVer = LOC_CACHE_TOP_VER; GridAffinityAssignmentCache aff0 = aff; @@ -449,4 +456,28 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { if (aff0 != null) aff0.dumpDebugInfo(); } + + /** + * @return Affinity cache. + */ + public GridAffinityAssignmentCache affinityCache() { + return aff; + } + + /** + * @param part Partition. + * @param startVer Start version. + * @param endVer End version. + * @return {@code True} if primary changed or required affinity version not found in history. + */ + public boolean primaryChanged(int part, AffinityTopologyVersion startVer, AffinityTopologyVersion endVer) { + assert !cctx.isLocal() : cctx.name(); + + GridAffinityAssignmentCache aff0 = aff; + + if (aff0 == null) + throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); + + return aff0.primaryChanged(part, startVer, endVer); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java index ad8ade1..ffce82d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllRunnable.java @@ -23,7 +23,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryProcessor; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index d159afa..88d6e04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1260,7 +1260,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * Sets thread local cache operation context. * - * @param opCtx Flags to set. + * @param opCtx Operation context. */ public void operationContextPerCall(@Nullable CacheOperationContext opCtx) { if (nearContext()) http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index b1a57a9..a21e18b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -1474,9 +1474,16 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { ClusterNode loc = cctx.localNode(); + AffinityTopologyVersion initTopVer = + new AffinityTopologyVersion(cctx.discovery().localJoinEvent().topologyVersion(), 0); + + AffinityTopologyVersion cacheStartVer = cctx.startTopologyVersion(); + + if (cacheStartVer != null && cacheStartVer.compareTo(initTopVer) > 0) + initTopVer = cacheStartVer; + // Initialize. - primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(), - cctx.affinity().affinityTopologyVersion())); + primaryParts.addAll(cctx.affinity().primaryPartitions(cctx.localNodeId(), initTopVer)); while (!isCancelled()) { DiscoveryEvent evt = evts.take(); @@ -1484,12 +1491,14 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (log.isDebugEnabled()) log.debug("Processing event: " + evt); + AffinityTopologyVersion topVer = new AffinityTopologyVersion(evt.topologyVersion()); + // Remove partitions that are no longer primary. for (Iterator<Integer> it = primaryParts.iterator(); it.hasNext();) { if (!evts.isEmpty()) break; - if (!cctx.affinity().primary(loc, it.next(), new AffinityTopologyVersion(evt.topologyVersion()))) + if (!cctx.affinity().primary(loc, it.next(), topVer)) it.remove(); } @@ -1501,8 +1510,7 @@ public class GridCacheEvictionManager extends GridCacheManagerAdapter { if (!evts.isEmpty()) break; - if (part.primary(new AffinityTopologyVersion(evt.topologyVersion())) - && primaryParts.add(part.id())) { + if (part.primary(topVer) && primaryParts.add(part.id())) { if (log.isDebugEnabled()) log.debug("Touching partition entries: " + part); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index b297827..aab1bcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -128,16 +128,33 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order()); - assert cacheMsg.topologyVersion().compareTo(startTopVer) > 0 : - "Invalid affinity request [startTopVer=" + startTopVer + ", msg=" + cacheMsg + ']'; + DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheMsg.cacheId()); - // Need to wait for initial exchange to avoid race between cache start and affinity request. + if (cacheDesc != null) { + if (cacheDesc.startTopologyVersion() != null) + startTopVer = cacheDesc.startTopologyVersion(); + else if (cacheDesc.receivedFromStartVersion() != null) + startTopVer = cacheDesc.receivedFromStartVersion(); + } + + // Need to wait for exchange to avoid race between cache start and affinity request. fut = cctx.exchange().affinityReadyFuture(startTopVer); if (fut != null && !fut.isDone()) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { - @Override public void run() { - lsnr.onMessage(nodeId, cacheMsg); + if (log.isDebugEnabled()) { + log.debug("Wait for exchange before processing message [msg=" + msg + + ", node=" + nodeId + + ", waitVer=" + startTopVer + + ", cacheDesc=" + cacheDesc + ']'); + } + + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> fut) { + cctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + handleMessage(nodeId, cacheMsg); + } + }); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 511eed4..cc1a8d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1720,7 +1720,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Must persist inside synchronization in non-tx mode. cctx.store().put(null, key, updated, ver); - // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. updateIndex(updated, expireTime, ver, old); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java index aba8318..f1c1b83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccCandidate.java @@ -29,11 +29,11 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; @@ -602,7 +602,13 @@ public class GridCacheMvccCandidate implements Externalizable, @Override public void writeExternal(ObjectOutput out) throws IOException { IgniteUtils.writeUuid(out, nodeId); - CU.writeVersion(out, ver); + out.writeBoolean(ver == null); + + if (ver != null) { + out.writeBoolean(ver instanceof GridCacheVersionEx); + + ver.writeExternal(out); + } out.writeLong(timeout); out.writeLong(threadId); @@ -614,7 +620,11 @@ public class GridCacheMvccCandidate implements Externalizable, @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { nodeId = IgniteUtils.readUuid(in); - ver = CU.readVersion(in); + if (!in.readBoolean()) { + ver = in.readBoolean() ? new GridCacheVersionEx() : new GridCacheVersion(); + + ver.readExternal(in); + } timeout = in.readLong(); threadId = in.readLong(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 462c511..0d9f174 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -110,7 +110,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private static final int EXCHANGE_HISTORY_SIZE = 1000; /** Cleanup history size. */ - public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 100); + public static final int EXCH_FUT_CLEANUP_HISTORY_SIZE = getInteger(IGNITE_AFFINITY_HISTORY_SIZE, 500); /** Atomic reference for pending timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); @@ -189,8 +189,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED) { assert cctx.discovery().node(n.id()) == null; - for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) - f.onNodeLeft(n.id()); + // Avoid race b/w initial future add and discovery event. + GridDhtPartitionsExchangeFuture initFut = null; + + if (readyTopVer.get().equals(AffinityTopologyVersion.NONE)) { + initFut = exchangeFuture(initialExchangeId(), null, null, null); + + initFut.onNodeLeft(n); + } + + for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) { + if (f != initFut) + f.onNodeLeft(n); + } } assert @@ -202,7 +213,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana affinityTopologyVersion(e), e.type()); - exchFut = exchangeFuture(exchId, e, null); + exchFut = exchangeFuture(exchId, e, null, null); } else { DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; @@ -237,9 +248,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!F.isEmpty(valid)) { exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); - exchFut = exchangeFuture(exchId, e, valid); + exchFut = exchangeFuture(exchId, e, valid, null); } } + else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) { + CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage(); + + if (msg.exchangeId() == null) { + if (msg.exchangeNeeded()) { + exchId = exchangeId(n.id(), affinityTopologyVersion(e), e.type()); + + exchFut = exchangeFuture(exchId, e, null, msg); + } + } + else + exchangeFuture(msg.exchangeId(), null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg); + } } if (exchId != null) { @@ -301,6 +325,21 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return reconnectExchangeFut; } + /** + * @return Initial exchange ID. + */ + private GridDhtPartitionExchangeId initialExchangeId() { + DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent(); + + assert discoEvt != null; + + final AffinityTopologyVersion startTopVer = affinityTopologyVersion(discoEvt); + + assert discoEvt.topologyVersion() == startTopVer.topologyVersion(); + + return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED); + } + /** {@inheritDoc} */ @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { super.onKernalStart0(reconnect); @@ -314,15 +353,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Generate dummy discovery event for local node joining. DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent(); - final AffinityTopologyVersion startTopVer = affinityTopologyVersion(discoEvt); - - GridDhtPartitionExchangeId exchId = exchangeId(loc.id(), startTopVer, EVT_NODE_JOINED); - - assert discoEvt != null; - - assert discoEvt.topologyVersion() == startTopVer.topologyVersion(); + GridDhtPartitionExchangeId exchId = initialExchangeId(); - GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null); + GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null, null); if (reconnect) reconnectExchangeFut = new GridFutureAdapter<>(); @@ -339,14 +372,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return; try { - if (m instanceof GridDhtPartitionSupplyMessageV2) - cctx.cacheContext(m.cacheId).preloader().handleSupplyMessage( - idx, id, (GridDhtPartitionSupplyMessageV2)m); - else if (m instanceof GridDhtPartitionDemandMessage) - cctx.cacheContext(m.cacheId).preloader().handleDemandMessage( - idx, id, (GridDhtPartitionDemandMessage)m); - else - log.error("Unsupported message type: " + m.getClass().getName()); + GridCacheContext cacheCtx = cctx.cacheContext(m.cacheId); + + if (cacheCtx != null) { + if (m instanceof GridDhtPartitionSupplyMessageV2) + cacheCtx.preloader().handleSupplyMessage( + idx, id, (GridDhtPartitionSupplyMessageV2)m); + else if (m instanceof GridDhtPartitionDemandMessage) + cacheCtx.preloader().handleDemandMessage( + idx, id, (GridDhtPartitionDemandMessage)m); + else + U.error(log, "Unsupported message type: " + m.getClass().getName()); + } } finally { leaveBusy(); @@ -708,28 +745,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * Refresh partitions. - * - * @param timeout Timeout. - */ - private void refreshPartitions(long timeout) { - long last = lastRefresh.get(); - - long now = U.currentTimeMillis(); - - if (last != -1 && now - last >= timeout && lastRefresh.compareAndSet(last, now)) { - if (log.isDebugEnabled()) - log.debug("Refreshing partitions [last=" + last + ", now=" + now + ", delta=" + (now - last) + - ", timeout=" + timeout + ", lastRefresh=" + lastRefresh + ']'); - - refreshPartitions(); - } - else if (log.isDebugEnabled()) - log.debug("Partitions were not refreshed [last=" + last + ", now=" + now + ", delta=" + (now - last) + - ", timeout=" + timeout + ", lastRefresh=" + lastRefresh + ']'); - } - - /** * @param nodes Nodes. * @return {@code True} if message was sent, {@code false} if node left grid. */ @@ -838,20 +853,26 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param exchId Exchange ID. * @param discoEvt Discovery event. * @param reqs Cache change requests. + * @param affChangeMsg Affinity change message. * @return Exchange future. */ GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId, - @Nullable DiscoveryEvent discoEvt, @Nullable Collection<DynamicCacheChangeRequest> reqs) { + @Nullable DiscoveryEvent discoEvt, + @Nullable Collection<DynamicCacheChangeRequest> reqs, + @Nullable CacheAffinityChangeMessage affChangeMsg) { GridDhtPartitionsExchangeFuture fut; GridDhtPartitionsExchangeFuture old = exchFuts.addx( - fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs)); + fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs, affChangeMsg)); if (old != null) { fut = old; if (reqs != null) fut.cacheChangeRequests(reqs); + + if (affChangeMsg != null) + fut.affinityChangeMessage(affChangeMsg); } if (discoEvt != null) @@ -892,7 +913,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer)); - for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet()) + AffinityTopologyVersion histVer = new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0); + + for (AffinityTopologyVersion oldVer : nodeVers.headMap(histVer).keySet()) nodeVers.remove(oldVer); if (err == null) { @@ -940,11 +963,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana skipped++; if (skipped == EXCH_FUT_CLEANUP_HISTORY_SIZE) { - for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (err == null) { + if (err == null) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!cacheCtx.isLocal()) cacheCtx.affinity().cleanUpCache(fut.topologyVersion()); } + + cctx.affinity().cleanUpCache(fut.topologyVersion()); } } @@ -1006,7 +1031,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana refreshPartitions(); } else - exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); + exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); } finally { leaveBusy(); @@ -1047,8 +1072,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else if (!cacheCtx.isLocal()) top = cacheCtx.topology(); - if (top != null) + if (top != null) { updated |= top.update(null, entry.getValue(), null) != null; + + cctx.affinity().checkRebalanceState(top, cacheId); + } } if (updated) @@ -1058,17 +1086,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (msg.client()) { final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(), null, + null, null); exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { // Finished future should reply only to sender client node. - exchFut.onReceive(node.id(), msg); + exchFut.onReceive(node, msg); } }); } else - exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg); + exchangeFuture(msg.exchangeId(), null, null, null).onReceive(node, msg); } } finally { @@ -1127,26 +1156,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - dumpPendingObjects(); + dumpPendingObjects(null); for (GridCacheContext cacheCtx : cctx.cacheContexts()) cacheCtx.preloader().dumpDebugInfo(); + cctx.affinity().dumpDebugInfo(); + // Dump IO manager statistics. cctx.gridIO().dumpStats(); } /** - * + * @param exchTopVer Exchange topology version. */ - public void dumpPendingObjects() { + public void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) { IgniteTxManager tm = cctx.tm(); if (tm != null) { U.warn(log, "Pending transactions:"); - for (IgniteInternalTx tx : tm.activeTransactions()) - U.warn(log, ">>> " + tx); + for (IgniteInternalTx tx : tm.activeTransactions()) { + if (exchTopVer != null) { + U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + + ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) + + ", tx=" + tx + ']'); + } + else + U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']'); + } } GridCacheMvccManager mvcc = cctx.mvcc(); @@ -1430,7 +1468,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (marshR != null || !rebalanceQ.isEmpty()) { if (futQ.isEmpty()) { - U.log(log, "Rebalancing required" + + U.log(log, "Rebalancing required " + "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index be019fc..acd8c4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -73,18 +73,6 @@ public interface GridCachePreloader { public void onInitialExchangeComplete(@Nullable Throwable err); /** - * Callback by exchange manager when new exchange future is added to worker. - */ - public void onExchangeFutureAdded(); - - /** - * Updates last exchange future. - * - * @param lastFut Last future. - */ - public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut); - - /** * @param exchFut Exchange future to assign. * @return Assignments or {@code null} if detected that there are pending exchanges. */ @@ -192,11 +180,9 @@ public interface GridCachePreloader { public void evictPartitionAsync(GridDhtLocalPartition part); /** - * Handles new topology. - * - * @param topVer Topology version. + * @param lastFut Last future. */ - public void onTopologyChanged(AffinityTopologyVersion topVer); + public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut); /** * Dumps debug information. http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 5d98c6f..e393421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -159,16 +159,6 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public void onExchangeFutureAdded() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) { - // No-op. - } - - /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { return null; } @@ -185,7 +175,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public void onTopologyChanged(AffinityTopologyVersion topVer) { + @Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 67d6a6c..38f861b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -104,6 +104,7 @@ import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; @@ -138,6 +139,7 @@ import static org.apache.ignite.configuration.DeploymentMode.CONTINUOUS; import static org.apache.ignite.configuration.DeploymentMode.ISOLATED; import static org.apache.ignite.configuration.DeploymentMode.PRIVATE; import static org.apache.ignite.configuration.DeploymentMode.SHARED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG; @@ -645,6 +647,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { desc.locallyConfigured(true); desc.staticallyConfigured(true); + desc.receivedFrom(ctx.localNodeId()); if (!template) { registeredCaches.put(masked, desc); @@ -757,10 +760,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon() && !CU.isMarshallerCache(desc.cacheConfiguration().getName())) continue; - boolean started = desc.onStart(); - - assert started : "Failed to change started flag for locally configured cache: " + desc; - desc.clearRemoteConfigurations(); CacheConfiguration ccfg = desc.cacheConfiguration(); @@ -770,6 +769,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean loc = desc.locallyConfigured(); if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) { + boolean started = desc.onStart(); + + assert started : "Failed to change started flag for locally configured cache: " + desc; + CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); CachePluginManager pluginMgr = desc.pluginManager(); @@ -890,6 +893,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { // No new caches should be added after this point. exch.onKernalStop(cancel); + for (GridCacheAdapter<?, ?> cache : caches.values()) { + GridCacheAffinityManager aff = cache.context().affinity(); + + if (aff != null) + aff.cancelFutures(); + } + for (String cacheName : stopSeq) { GridCacheAdapter<?, ?> cache = caches.remove(maskNull(cacheName)); @@ -1287,12 +1297,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { storeMgr.initialize(cfgStore, sesHolders); + boolean affNode = CU.affinityNode(ctx.discovery().localNode(), cfg.getNodeFilter()); + GridCacheContext<?, ?> cacheCtx = new GridCacheContext( ctx, sharedCtx, cfg, cacheType, - ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + affNode, updatesAllowed, /* @@ -1421,7 +1433,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx, cfg, cacheType, - ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + affNode, true, /* @@ -1531,7 +1543,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { }, new IgnitePredicate<DynamicCacheDescriptor>() { @Override public boolean apply(DynamicCacheDescriptor desc) { - return desc.started() && desc.cacheType().userCache(); + return desc.cacheType().userCache(); } } ); @@ -1549,42 +1561,57 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * @param reqs Requests to start. + * @param req Cache start request. * @param topVer Topology version. - * @throws IgniteCheckedException If failed to start cache. + * @throws IgniteCheckedException If failed. */ - @SuppressWarnings("TypeMayBeWeakened") - public void prepareCachesStart( - Collection<DynamicCacheChangeRequest> reqs, - AffinityTopologyVersion topVer - ) throws IgniteCheckedException { - if (ctx.isDaemon()) - return; + public void prepareCacheStart(DynamicCacheChangeRequest req, AffinityTopologyVersion topVer) + throws IgniteCheckedException { + assert req.start() : req; + assert req.cacheType() != null : req; + + prepareCacheStart( + req.startCacheConfiguration(), + req.nearCacheConfiguration(), + req.cacheType(), + req.clientStartOnly(), + req.initiatingNodeId(), + req.deploymentId(), + topVer + ); - for (DynamicCacheChangeRequest req : reqs) { - assert req.start() : req; - assert req.cacheType() != null : req; - - prepareCacheStart( - req.startCacheConfiguration(), - req.nearCacheConfiguration(), - req.cacheType(), - req.clientStartOnly(), - req.initiatingNodeId(), - req.deploymentId(), - topVer - ); + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); - DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); + if (desc != null) + desc.onStart(); + } - if (desc != null) - desc.onStart(); - } + /** + * Starts statically configured caches received from remote nodes during exchange. + * + * @param topVer Topology version. + * @throws IgniteCheckedException If failed. + * @return Started caches descriptors. + */ + public Collection<DynamicCacheDescriptor> startReceivedCaches(AffinityTopologyVersion topVer) + throws IgniteCheckedException { + List<DynamicCacheDescriptor> started = null; - // Start statically configured caches received from remote nodes during exchange. for (DynamicCacheDescriptor desc : registeredCaches.values()) { - if (desc.staticallyConfigured() && !desc.locallyConfigured()) { + if (!desc.started() && desc.staticallyConfigured() && !desc.locallyConfigured()) { + if (desc.receivedFrom() != null) { + AffinityTopologyVersion startVer = desc.receivedFromStartVersion(); + + if (startVer == null || startVer.compareTo(topVer) > 0) + continue; + } + if (desc.onStart()) { + if (started == null) + started = new ArrayList<>(); + + started.add(desc); + prepareCacheStart( desc.cacheConfiguration(), null, @@ -1597,6 +1624,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } } + + return started; } /** @@ -1786,6 +1815,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param kernalCtx Kernal context. * @param storeSesLsnrs Store session listeners. * @return Shared context. + * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, @@ -1796,6 +1826,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheDeploymentManager depMgr = new GridCacheDeploymentManager(); GridCachePartitionExchangeManager exchMgr = new GridCachePartitionExchangeManager(); GridCacheIoManager ioMgr = new GridCacheIoManager(); + CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager(); CacheJtaManagerAdapter jta = JTA.createOptional(); @@ -1806,6 +1837,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { mvccMgr, depMgr, exchMgr, + topMgr, ioMgr, jta, storeSesLsnrs @@ -1845,6 +1877,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.deploymentId(desc.deploymentId()); + req.receivedFrom(desc.receivedFrom()); + reqs.add(req); Boolean nearEnabled = cache.isNear(); @@ -1868,6 +1902,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.deploymentId(desc.deploymentId()); + req.receivedFrom(desc.receivedFrom()); + reqs.add(req); } @@ -1943,9 +1979,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration ccfg = req.startCacheConfiguration(); if (existing != null) { - if (existing.locallyConfigured()) { + if (joiningNodeId.equals(ctx.localNodeId())) { + existing.receivedFrom(req.receivedFrom()); + existing.deploymentId(req.deploymentId()); + } + if (existing.locallyConfigured()) { existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); ctx.discovery().setCacheFilter( @@ -1972,6 +2012,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (joiningNodeId.equals(ctx.localNodeId())) desc.receivedOnDiscovery(true); + desc.receivedFrom(req.receivedFrom()); + DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc); assert old == null : old; @@ -2442,7 +2484,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** - * Callback invoked from discovery thread when discovery custom message is received. + * @param type Event type. + * @param node Event node. + * @param topVer Topology version. + */ + public void onDiscoveryEvent(int type, ClusterNode node, AffinityTopologyVersion topVer) { + if (type == EVT_NODE_JOINED) { + for (DynamicCacheDescriptor cacheDesc : registeredCaches.values()) { + if (node.id().equals(cacheDesc.receivedFrom())) + cacheDesc.receivedFromStartVersion(topVer); + } + } + + sharedCtx.affinity().onDiscoveryEvent(type, node, topVer); + } + + /** + * Callback invoked from discovery thread when discovery custom message is received. * * @param msg Customer message. * @param topVer Current topology version. @@ -2450,6 +2508,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { */ public boolean onCustomEvent(DiscoveryCustomMessage msg, AffinityTopologyVersion topVer) { + if (msg instanceof CacheAffinityChangeMessage) + return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg)); + return msg instanceof DynamicCacheChangeBatch && onCacheChangeRequested((DynamicCacheChangeBatch) msg, topVer); } @@ -3127,6 +3188,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @return Cache descriptors. + */ + public Collection<DynamicCacheDescriptor> cacheDescriptors() { + return registeredCaches.values(); + } + + /** * @param cacheId Cache ID. * @return Cache descriptor. */ @@ -3438,10 +3506,45 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If validation failed. * @return Configuration copy. */ - private CacheConfiguration cloneCheckSerializable(CacheConfiguration val) throws IgniteCheckedException { + private CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException { if (val == null) return null; + return withBinaryContext(new IgniteOutClosureX<CacheConfiguration>() { + @Override public CacheConfiguration applyx() throws IgniteCheckedException { + if (val.getCacheStoreFactory() != null) { + try { + ClassLoader ldr = ctx.config().getClassLoader(); + + if (ldr == null) + ldr = val.getCacheStoreFactory().getClass().getClassLoader(); + + marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()), + U.resolveClassLoader(ldr, ctx.config())); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Failed to validate cache configuration. " + + "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e); + } + } + + try { + return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config())); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Failed to validate cache configuration " + + "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e); + } + } + }); + } + + /** + * @param c Closure. + * @throws IgniteCheckedException If failed. + * @return Closure result. + */ + private <T> T withBinaryContext(IgniteOutClosureX<T> c) throws IgniteCheckedException { IgniteCacheObjectProcessor objProc = ctx.cacheObjects(); BinaryContext oldCtx = null; @@ -3452,29 +3555,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { } try { - if (val.getCacheStoreFactory() != null) { - try { - ClassLoader ldr = ctx.config().getClassLoader(); - - if (ldr == null) - ldr = val.getCacheStoreFactory().getClass().getClassLoader(); - - marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()), - U.resolveClassLoader(ldr, ctx.config())); - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to validate cache configuration. " + - "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e); - } - } - - try { - return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config())); - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to validate cache configuration " + - "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e); - } + return c.applyx(); } finally { if (objProc instanceof CacheObjectBinaryProcessorImpl) @@ -3483,6 +3564,19 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @param obj Object to clone. + * @return Object copy. + * @throws IgniteCheckedException If failed. + */ + public <T> T clone(final T obj) throws IgniteCheckedException { + return withBinaryContext(new IgniteOutClosureX<T>() { + @Override public T applyx() throws IgniteCheckedException { + return marshaller.unmarshal(marshaller.marshal(obj), U.resolveClassLoader(ctx.config())); + } + }); + }; + + /** * @param name Name to mask. * @return Masked name. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e7e223f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 66304e4..03735cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1659,6 +1659,19 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public IgniteInternalCache<K, V> withNoRetries() { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return new GridCacheProxyImpl<>(ctx, delegate, + new CacheOperationContext(false, null, false, null, true, null)); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); out.writeObject(delegate);
