Now cache plugins are able to unwrap entries passed to EntryProcessor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbe5258b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbe5258b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbe5258b Branch: refs/heads/ignite-2630 Commit: bbe5258b8ffd899ccddc223b5f40632a9f624e40 Parents: a4b922c Author: dkarachentsev <[email protected]> Authored: Wed Mar 30 13:29:36 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Mar 30 13:29:36 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheInvokeEntry.java | 41 +++++++++++++++----- .../processors/cache/CacheLazyEntry.java | 9 ++++- .../processors/cache/GridCacheMapEntry.java | 9 +++-- .../distributed/dht/GridDhtTxPrepareFuture.java | 12 +++--- .../dht/atomic/GridDhtAtomicCache.java | 4 +- .../local/atomic/GridLocalAtomicCache.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 8 ++-- .../cache/transactions/IgniteTxEntry.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 5 +-- .../processors/plugin/CachePluginManager.java | 25 ++++++++++++ .../ignite/plugin/CachePluginProvider.java | 11 ++++++ ...CacheDeploymentCachePluginConfiguration.java | 7 ++++ 12 files changed, 107 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java index 2ecfdbf..2526146 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java @@ -40,44 +40,53 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta /** Entry version. */ private GridCacheVersion ver; + /** Cache entry instance. */ + private GridCacheEntryEx entry; + /** - * @param cctx Cache context. + * Constructor. + * * @param keyObj Key cache object. * @param valObj Cache object value. * @param ver Entry version. + * @param keepBinary Keep binary flag. + * @param entry Original entry. */ - public CacheInvokeEntry(GridCacheContext cctx, - KeyCacheObject keyObj, + public CacheInvokeEntry(KeyCacheObject keyObj, @Nullable CacheObject valObj, GridCacheVersion ver, - boolean keepBinary + boolean keepBinary, + GridCacheEntryEx entry ) { - super(cctx, keyObj, valObj, keepBinary); + super(entry.context(), keyObj, valObj, keepBinary); this.hadVal = valObj != null; this.ver = ver; + this.entry = entry; } /** - * @param ctx Cache context. * @param keyObj Key cache object. * @param key Key value. * @param valObj Value cache object. * @param val Value. * @param ver Entry version. + * @param keepBinary Keep binary flag. + * @param entry Grid cache entry. */ - public CacheInvokeEntry(GridCacheContext<K, V> ctx, - KeyCacheObject keyObj, + public CacheInvokeEntry(KeyCacheObject keyObj, @Nullable K key, @Nullable CacheObject valObj, @Nullable V val, GridCacheVersion ver, - boolean keepBinary + boolean keepBinary, + GridCacheEntryEx entry ) { - super(ctx, keyObj, key, valObj, val, keepBinary); + super(entry.context(), keyObj, key, valObj, val, keepBinary); this.hadVal = valObj != null || val != null; this.ver = ver; + this.entry = entry; } /** {@inheritDoc} */ @@ -122,12 +131,24 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta return op != Operation.NONE; } + /** + * @return Cache entry instance. + */ + public GridCacheEntryEx entry() { + return entry; + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> cls) { if (cls.isAssignableFrom(CacheEntry.class) && ver != null) return (T)new CacheEntryImplEx<>(getKey(), getValue(), ver); + final T res = cctx.plugin().unwrapCacheEntry(this, cls); + + if (res != null) + return res; + return super.unwrap(cls); } http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java index 6ec17c0..c1fcb77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java @@ -80,7 +80,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { * @param keepBinary Keep binary flag. * @param val Cache value. */ - public CacheLazyEntry(GridCacheContext<K, V> ctx, + public CacheLazyEntry(GridCacheContext ctx, KeyCacheObject keyObj, K key, CacheObject valObj, @@ -136,6 +136,13 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { return key; } + /** + * @return Keep binary flag. + */ + public boolean keepBinary() { + return keepBinary; + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> cls) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c5df29b..08941ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1620,7 +1620,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert entryProcessor != null; - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key, old, version(), keepBinary); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(key, old, version(), keepBinary, this); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1913,7 +1913,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme oldVal = rawGetOrUnmarshalUnlocked(true); - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version(), keepBinary); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), + keepBinary, this); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -2051,7 +2052,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme (EntryProcessor<Object, Object, ?>)writeObj; CacheInvokeEntry<Object, Object> entry = - new CacheInvokeEntry<>(cctx, key, prevVal, version(), keepBinary); + new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this); try { entryProcessor.process(entry, invokeArgs); @@ -2181,7 +2182,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version(), keepBinary); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), keepBinary, this); try { Object computed = entryProcessor.process(entry, invokeArgs); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 732c298..445c70a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -360,6 +360,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (evt && txEntry.op() == TRANSFORM) entryProc = F.first(txEntry.entryProcessors()).get1(); + final boolean keepBinary = txEntry.keepBinary(); + CacheObject val = cached.innerGet( tx, /*swap*/true, @@ -373,7 +375,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entryProc, tx.resolveTaskName(), null, - txEntry.keepBinary()); + keepBinary); if (retVal || txEntry.op() == TRANSFORM) { if (!F.isEmpty(txEntry.entryProcessors())) { @@ -389,9 +391,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter boolean modified = false; - for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( - txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary()); + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val, + txEntry.cached().version(), keepBinary, txEntry.cached()); try { EntryProcessor<Object, Object, Object> processor = t.get1(); @@ -435,7 +437,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } else if (retVal) - ret.value(cacheCtx, val, txEntry.keepBinary()); + ret.value(cacheCtx, val, keepBinary); } if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e8a2200..1f5c817 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1656,8 +1656,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Object oldVal = null; Object updatedVal = null; - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old, - entry.version(), req.keepBinary()); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(entry.key(), old, + entry.version(), req.keepBinary(), entry); CacheObject updated; http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 8e5fe9e..07b70cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1192,8 +1192,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { Object oldVal = null; - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old, - entry.version(), keepBinary); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(entry.key(), old, + entry.version(), keepBinary, entry); CacheObject updated; Object updatedVal = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index f6dfd32..9e5d626 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1260,6 +1260,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ); + final boolean keepBinary = txEntry.keepBinary(); + CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() : txEntry.cached().innerGet(this, /*swap*/false, @@ -1273,7 +1275,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null, resolveTaskName(), null, - txEntry.keepBinary()); + keepBinary); boolean modified = false; @@ -1296,8 +1298,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val, ver, txEntry.keepBinary()); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>( + txEntry.key(), key, cacheVal, val, ver, keepBinary, txEntry.cached()); try { EntryProcessor<Object, Object, Object> processor = t.get1(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index f682605..9060fa7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -691,8 +691,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) { try { - CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val, - ver, keepBinary()); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(key, keyVal, cacheVal, val, + ver, keepBinary(), cached()); EntryProcessor processor = t.get1(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 1d77da5..0337145 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2971,9 +2971,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig Object res = null; for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { - CacheInvokeEntry<Object, Object> invokeEntry = - new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, ver, - txEntry.keepBinary()); + CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), key0, cacheVal, + val0, ver, txEntry.keepBinary(), txEntry.cached()); EntryProcessor<Object, Object, ?> entryProcessor = t.get1(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java index 23f1f3f..d0efc0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java @@ -38,6 +38,9 @@ import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.plugin.CachePluginConfiguration; import org.apache.ignite.plugin.CachePluginContext; import org.apache.ignite.plugin.CachePluginProvider; +import org.jetbrains.annotations.Nullable; + +import javax.cache.Cache; /** * Cache plugin manager. @@ -131,6 +134,28 @@ public class CachePluginManager extends GridCacheManagerAdapter { } /** + * Unwrap entry to specified type. For details see {@code javax.cache.Cache.Entry.unwrap(Class)}. + * + * @param entry Entry to unwrap. + * @param cls Type of the expected component. + * @param <T> Return type. + * @param <K> Key type. + * @param <V> Value type. + * @return New instance of underlying type or {@code null} if it's not available. + */ + @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"}) + @Nullable public <T, K, V> T unwrapCacheEntry(Cache.Entry<K, V> entry, Class<T> cls) { + for (int i = 0; i < providersList.size(); i++) { + final T res = (T)providersList.get(i).unwrapCacheEntry(entry, cls); + + if (res != null) + return res; + } + + return null; + } + + /** * Validates cache plugin configurations. Throw exception if validation failed. * * @throws IgniteCheckedException If validation failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java index 11550ec..b7ed0b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java @@ -22,6 +22,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.jetbrains.annotations.Nullable; +import javax.cache.Cache; + /** * Cache plugin provider is a point for processing of properties * which provide specific {@link CachePluginConfiguration}. @@ -65,6 +67,15 @@ public interface CachePluginProvider<C extends CachePluginConfiguration> { @Nullable public <T> T createComponent(Class<T> cls); /** + * Unwrap entry to specified type. For details see {@code javax.cache.Cache.Entry.unwrap(Class)}. + * + * @param entry Mutable entry to unwrap. + * @param cls Type of the expected component. + * @return New instance of underlying type or {@code null} if it's not available. + */ + @Nullable public <T, K, V> T unwrapCacheEntry(Cache.Entry<K, V> entry, Class<T> cls); + + /** * Validates cache plugin configuration in process of cache creation. Throw exception if validation failed. * * @throws IgniteCheckedException If validation failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java index bb37c25..ff2e674 100644 --- a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java @@ -25,6 +25,8 @@ import org.apache.ignite.plugin.CachePluginContext; import org.apache.ignite.plugin.CachePluginProvider; import org.jetbrains.annotations.Nullable; +import javax.cache.Cache; + /** * Test cache plugin configuration for cache deployment tests. */ @@ -41,6 +43,11 @@ public class CacheDeploymentCachePluginConfiguration<K, V> implements CachePlugi } /** {@inheritDoc} */ + @Nullable @Override public Object unwrapCacheEntry(final Cache.Entry mutableEntry, final Class cls) { + return null; + } + + /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { }
