IGNITE-GG-10837 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93656982 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93656982 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93656982 Branch: refs/heads/ignite-gg-10837 Commit: 93656982b8cbf993445676a92885234788ed6533 Parents: a011bb7 Author: nikolay_tikhonov <[email protected]> Authored: Tue Dec 15 12:53:55 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Dec 15 13:02:14 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 34 +++++++++++++++++ .../processors/cache/GridCacheProxyImpl.java | 19 ++++++++++ .../processors/cache/IgniteInternalCache.java | 22 +++++++++++ .../dht/atomic/GridDhtAtomicCache.java | 28 +++++++++++++- .../distributed/near/GridNearAtomicCache.java | 12 ++++++ .../processors/cache/dr/GridCacheDrInfo.java | 9 ++++- .../transactions/IgniteTxLocalAdapter.java | 39 +++++++++++++++++--- .../cache/transactions/IgniteTxLocalEx.java | 10 +++++ 8 files changed, 165 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index cc4e962..914797f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2077,6 +2077,40 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ + @Override public void invokeAllConflict(final Map<KeyCacheObject, GridCacheDrInfo> map, + final Object... args) throws IgniteCheckedException { + if (F.isEmpty(map)) + return; + + syncOp(new SyncInOp(map.size() == 1) { + @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + tx.invokeAllDrAsync(ctx, map, args).get(); + } + + @Override public String toString() { + return "invokeAllDrAsync [drMap=" + map + ']'; + } + }); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> invokeAllConflictAsync(final Map<KeyCacheObject, GridCacheDrInfo> map, + final Object... args) throws IgniteCheckedException { + if (F.isEmpty(map)) + return new GridFinishedFuture<Object>(); + + return asyncOp(new AsyncInOp(map.keySet()) { + @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + return tx.invokeAllDrAsync(ctx, map, args); + } + + @Override public String toString() { + return "invokeAllDrAsync [drMap=" + map + ']'; + } + }); + } + + /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(final K key, final EntryProcessor<K, V, T> entryProcessor, final Object... args) http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index d1d93d8..3753a9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -523,6 +523,25 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte } /** {@inheritDoc} */ + @Override public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, + Object... args) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + delegate.invokeAllConflict(map, args); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, + Object... args) throws IgniteCheckedException { + return null; + } + + /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 186de68..61dc13b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1557,6 +1557,28 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> { throws IgniteCheckedException; /** + * Invoke with conflict resolution. + * + * @param map Map containing keys and entry processors to be applied to values. + * @param args Arguments. + * @return Invoke results. + * @throws IgniteCheckedException If failed. + */ + public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args) + throws IgniteCheckedException; + + /** + * Invoke async with conflict resolution. + * + * @param map Map containing keys and entry processors to be applied to values. + * @param args Arguments. + * @return Invoke results. + * @throws IgniteCheckedException If failed. + */ + public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args) + throws IgniteCheckedException; + + /** * Removes DR data. * * @param drMap DR map. http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 481317a..4f2f8f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -587,6 +587,27 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, Object... args) + throws IgniteCheckedException { + invokeAllConflictAsync(map, args).get(); + } + + /** {@inheritDoc} */ + public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap, + Object... args) throws IgniteCheckedException { + + return updateAllAsync0(null, + null, + args, + conflictMap, + null, + false, + false, + null, + true); + } + + /** {@inheritDoc} */ @Override public V getAndRemove(K key) throws IgniteCheckedException { return getAndRemoveAsync(key).get(); } @@ -897,11 +918,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); + GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE; + + if (op == UPDATE && conflictPutMap != null && !conflictPutMap.isEmpty()) + op = F.firstEntry(conflictPutMap).getValue().entryProcessor() != null ? TRANSFORM : UPDATE; + final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( ctx, this, ctx.config().getWriteSynchronizationMode(), - invokeMap != null ? TRANSFORM : UPDATE, + op, map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ? conflictPutMap.keySet() : conflictRmvMap.keySet(), map != null ? map.values() : invokeMap != null ? invokeMap.values() : null, http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 06898cd..335266d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -545,6 +545,18 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public void invokeAllConflict(Map<KeyCacheObject, GridCacheDrInfo> map, + Object... args) throws IgniteCheckedException { + dht.invokeAllConflict(map, args); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> invokeAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> map, + Object... args) throws IgniteCheckedException { + return dht.invokeAllConflictAsync(map, args); + } + + /** {@inheritDoc} */ @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java index d37eb7b..1ec45e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java @@ -71,7 +71,7 @@ public class GridCacheDrInfo implements Externalizable { * @param ver Version. */ public GridCacheDrInfo(EntryProcessor proc, GridCacheVersion ver) { - assert val != null; + assert proc != null; assert ver != null; this.proc = proc; @@ -86,6 +86,13 @@ public class GridCacheDrInfo implements Externalizable { } /** + * @return Entry processor. + */ + public EntryProcessor entryProcessor() { + return proc; + } + + /** * @return Value (entry processor or cache object. */ public Object valueEx() { http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index a3aed34..8e7069d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1979,6 +1979,26 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ + @Override public IgniteInternalFuture<?> invokeAllDrAsync(GridCacheContext cacheCtx, + Map<KeyCacheObject, GridCacheDrInfo> drMap, + Object... args + ) { + Map<Object, EntryProcessor<Object, Object, Object>> invoke = new LinkedHashMap<>(); + + for (Map.Entry<KeyCacheObject, GridCacheDrInfo> e : drMap.entrySet()) + if (e.getValue().entryProcessor() != null) + invoke.put(e.getKey(), e.getValue().entryProcessor()); + + return this.<Object, Object>putAllAsync0(cacheCtx, + null, + invoke, + args, + drMap, + true, + null); + } + + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, @@ -3062,13 +3082,20 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (drMap != null) { assert map == null; - map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() { - @Override public Object apply(GridCacheDrInfo val) { - return val.value(); - } - }); + if (invokeMap == null) { + map0 = F.viewReadOnly(drMap, new IgniteClosure<GridCacheDrInfo, Object>() { + @Override public Object apply(GridCacheDrInfo val) { + return val.value(); + } + }); + + invokeMap0 = null; + } + else { + map0 = null; - invokeMap0 = null; + invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; + } } else { map0 = map; http://git-wip-us.apache.org/repos/asf/ignite/blob/93656982/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index a5d3373..2895a81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -156,6 +156,16 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { /** * @param cacheCtx Cache context. + * @param drMap DR map to put. + * @return Future for DR put operation. + */ + public IgniteInternalFuture<?> invokeAllDrAsync( + GridCacheContext cacheCtx, + Map<KeyCacheObject, GridCacheDrInfo> drMap, + Object... args); + + /** + * @param cacheCtx Cache context. * @param drMap DR map. * @return Future for asynchronous remove. */
