Repository: incubator-ignite Updated Branches: refs/heads/ignite-9469 [created] b9ba2bb7c
GG-9469 - Portable mode in store Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/641a7488 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/641a7488 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/641a7488 Branch: refs/heads/ignite-9469 Commit: 641a7488f1c0939b94ff6ec5877021e48e9eb4dd Parents: 984373d Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Dec 10 17:56:23 2014 +0300 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Dec 10 17:56:23 2014 +0300 ---------------------------------------------------------------------- .../grid/cache/GridCacheConfiguration.java | 40 ++++++ .../grid/cache/store/GridCacheStore.java | 23 ++- .../processors/cache/GridCacheAdapter.java | 36 ++--- .../processors/cache/GridCacheContext.java | 50 ++++--- .../processors/cache/GridCacheProcessor.java | 10 +- .../processors/cache/GridCacheStoreManager.java | 143 ++++++++++++++----- .../cache/GridCacheTxLocalAdapter.java | 17 ++- .../cache/GridCacheWriteBehindStore.java | 5 + .../cache/distributed/dht/GridDhtGetFuture.java | 2 +- .../dht/GridPartitionedGetFuture.java | 18 +-- .../dht/atomic/GridDhtAtomicCache.java | 9 +- .../dht/colocated/GridDhtColocatedCache.java | 11 +- .../distributed/near/GridNearGetFuture.java | 13 +- .../processors/interop/GridInteropAware.java | 7 + .../dataload/GridDataLoaderImplSelfTest.java | 96 +------------ 15 files changed, 268 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java index f2c5682..07d1628 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/GridCacheConfiguration.java @@ -336,6 +336,9 @@ public class GridCacheConfiguration { /** */ private boolean portableEnabled; + /** */ + private boolean keepPortableInStore = true; + /** Query configuration. */ private GridCacheQueryConfiguration qryCfg; @@ -379,6 +382,7 @@ public class GridCacheConfiguration { indexingSpiName = cc.getIndexingSpiName(); interceptor = cc.getInterceptor(); invalidate = cc.isInvalidate(); + keepPortableInStore = cc.isKeepPortableInStore(); offHeapMaxMem = cc.getOffHeapMaxMemory(); maxConcurrentAsyncOps = cc.getMaxConcurrentAsyncOperations(); maxQryIterCnt = cc.getMaximumQueryIteratorCount(); @@ -1757,6 +1761,42 @@ public class GridCacheConfiguration { } /** + * Flag indicating that {@link GridCacheStore} implementation + * is working with portable objects instead of Java objects + * if portable mode for this cache is enabled ({@link #isPortableEnabled()} + * flag is {@code true}). Default value of this flag is {@code true}, + * because this is recommended behavior from performance standpoint. + * <p> + * If set to {@code false}, GridGain will deserialize keys and + * values stored in portable format before they are passed + * to cache store. + * <p> + * Note that setting this flag to {@code false} can simplify + * store implementation in some cases, but it can cause performance + * degradation due to additional serializations and deserializations + * of portable objects. You will also need to have key and value + * classes on all nodes since portables will be deserialized when + * store is called. + * <p> + * This flag is ignored if portable mode is disabled for this + * cache ({@link #isPortableEnabled()} flag is {@code false}). + * + * @return Keep portables in store flag. + */ + public boolean isKeepPortableInStore() { + return keepPortableInStore; + } + + /** + * Sets keep portables in store flag. + * + * @param keepPortableInStore Keep portables in store flag. + */ + public void setKeepPortableInStore(boolean keepPortableInStore) { + this.keepPortableInStore = keepPortableInStore; + } + + /** * Gets query configuration. Query configuration defines which fields should be indexed for objects * without annotations or portable objects. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStore.java b/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStore.java index ba1be4f..3bebe9f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStore.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/store/GridCacheStore.java @@ -10,6 +10,7 @@ package org.gridgain.grid.cache.store; import org.apache.ignite.lang.*; +import org.apache.ignite.portables.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.store.jdbc.*; @@ -57,12 +58,12 @@ import java.util.Date; * </pre> * <h1 class="header">Working With Portable Objects</h1> * When portables are enabled for cache by setting {@link GridCacheConfiguration#isPortableEnabled()} to - * {@code true}), all portable keys and values are converted to instances of {@link org.apache.ignite.portables.PortableObject}. + * {@code true}), all portable keys and values are converted to instances of {@link PortableObject}. * Therefore, all cache store methods will take parameters in portable format. To avoid class * cast exceptions, store must have signature compatible with portables. E.g., if you use {@link Integer} * as a key and {@code Value} class as a value (which will be converted to portable format), cache store * signature should be the following: - * <pre> + * <pre name="code" class="java"> * public class PortableCacheStore implements GridCacheStore<Integer, GridPortableObject> { * public void put(@Nullable GridCacheTx tx, Integer key, GridPortableObject val) throws GridException { * ... @@ -71,7 +72,23 @@ import java.util.Date; * ... * } * </pre> - * Note that only portable classes are converted to {@link org.apache.ignite.portables.PortableObject} format. Following + * This behavior can be overridden by setting {@link GridCacheConfiguration#setKeepPortableInStore(boolean)} + * flag value to {@code false}. In this case, GridGain will deserialize keys and values stored in portable + * format before they are passed to cache store, so that you can use the following cache store signature instead: + * <pre name="code" class="java"> + * public class ObjectsCacheStore implements GridCacheStore<Integer, Person> { + * public void put(@Nullable GridCacheTx tx, Integer key, Person val) throws GridException { + * ... + * } + * + * ... + * } + * </pre> + * Note that while this can simplify store implementation in some cases, it will cause performance degradation + * due to additional serializations and deserializations of portable objects. You will also need to have key + * and value classes on all nodes since portables will be deserialized when store is invoked. + * <p> + * Note that only portable classes are converted to {@link PortableObject} format. Following * types are stored in cache without changes and therefore should not affect cache store signature: * <ul> * <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 31cd964..e4b86c2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -15,6 +15,7 @@ import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.*; import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.security.*; import org.apache.ignite.portables.*; import org.apache.ignite.resources.*; import org.gridgain.grid.*; @@ -25,13 +26,11 @@ import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.affinity.*; import org.gridgain.grid.kernal.processors.cache.datastructures.*; -import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; import org.gridgain.grid.kernal.processors.cache.dr.*; import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.kernal.processors.dr.*; import org.gridgain.grid.kernal.processors.task.*; -import org.apache.ignite.plugin.security.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; @@ -48,12 +47,11 @@ import java.util.concurrent.locks.*; import static java.util.Collections.*; import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.events.IgniteEventType.*; import static org.gridgain.grid.cache.GridCacheFlag.*; import static org.gridgain.grid.cache.GridCachePeekMode.*; import static org.gridgain.grid.cache.GridCacheTxConcurrency.*; import static org.gridgain.grid.cache.GridCacheTxIsolation.*; -import static org.gridgain.grid.cache.GridCacheTxState.*; -import static org.apache.ignite.events.IgniteEventType.*; import static org.gridgain.grid.kernal.GridClosureCallMode.*; import static org.gridgain.grid.kernal.processors.dr.GridDrType.*; import static org.gridgain.grid.kernal.processors.task.GridTaskThreadContextKey.*; @@ -617,8 +615,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im if (peek != null) { V v = peek.get(); - if (ctx.portableEnabled() && !ctx.keepPortable() && v instanceof PortableObject) - v = ((PortableObject)v).deserialize(); + if (ctx.portableEnabled()) + v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); return F.t(ctx.cloneOnFlag(v)); } @@ -632,8 +630,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im if (peek != null) { V v = peek.get(); - if (ctx.portableEnabled() && !ctx.keepPortable() && v instanceof PortableObject) - v = ((PortableObject)v).deserialize(); + if (ctx.portableEnabled()) + v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); return F.t(ctx.cloneOnFlag(v)); } @@ -1739,8 +1737,6 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im if (key == null) continue; - K key0 = null; - while (true) { GridCacheEntryEx<K, V> entry; @@ -1749,12 +1745,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im cached = null; } - else { - if (key0 == null) - key0 = ctx.portableEnabled() ? (K)ctx.marshalToPortable(key) : key; - - entry = entryEx(key0); - } + else + entry = entryEx(key); try { V val = entry.innerGet(null, @@ -3452,8 +3444,8 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im V val = unswapped.value(); - if (ctx.portableEnabled() && deserializePortable && val instanceof PortableObject) - return (V)((PortableObject)val).deserialize(); + if (ctx.portableEnabled()) + return (V)ctx.unwrapPortableIfNeeded(val, !deserializePortable); else return ctx.cloneOnFlag(val); } @@ -4463,6 +4455,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter) { String taskName = ctx.kernalContext().job().currentTaskName(); + if (ctx.portableEnabled() && !F.isEmpty(keys)) { + keys = F.viewReadOnly(keys, new C1<K, K>() { + @Override public K apply(K k) { + return (K)ctx.marshalToPortable(k); + } + }); + } + return getAllAsync(keys, ctx.hasFlag(GET_PRIMARY), /*skip tx*/false, null, null, taskName, deserializePortable, filter); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index f7e4471..d0800f5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -1602,34 +1602,31 @@ public class GridCacheContext<K, V> implements Externalizable { if (col instanceof ArrayList) return unwrapPortables((ArrayList<Object>)col); - int idx = 0; + Collection<Object> col0 = new ArrayList<>(col.size()); - for (Object obj : col) { - Object unwrapped = unwrapPortable(obj); + for (Object obj : col) + col0.add(unwrapPortable(obj)); - if (obj != unwrapped) { - Collection<Object> unwrappedCol = new ArrayList<>(col.size()); - - int idx0 = 0; - - for (Object obj0 : col) { - if (idx0 < idx) - unwrappedCol.add(obj0); - else if (idx == idx0) - unwrappedCol.add(unwrapped); - else - unwrappedCol.add(unwrapPortable(obj0)); + return col0; + } - idx0++; - } + /** + * Unwraps map. + * + * @param map Map to unwrap. + * @param keepPortable Keep portable flag. + * @return Unwrapped collection. + */ + public Map<Object, Object> unwrapPortablesIfNeeded(Map<Object, Object> map, boolean keepPortable) { + if (keepPortable || !config().isPortableEnabled()) + return map; - return unwrappedCol; - } + Map<Object, Object> map0 = U.newHashMap(map.size()); - idx++; - } + for (Map.Entry<Object, Object> e : map.entrySet()) + map0.put(unwrapPortable(e.getKey()), unwrapPortable(e.getValue())); - return col; + return map0; } /** @@ -1662,7 +1659,12 @@ public class GridCacheContext<K, V> implements Externalizable { */ @SuppressWarnings("IfMayBeConditional") public Object unwrapPortableIfNeeded(Object o, boolean keepPortable) { - if (keepPortable || !config().isPortableEnabled()) + assert !portableEnabled() || o == null || U.isPortableOrCollectionType(o.getClass()); + + if (o == null) + return null; + + if (keepPortable || !portableEnabled()) return o; return unwrapPortable(o); @@ -1699,6 +1701,8 @@ public class GridCacheContext<K, V> implements Externalizable { else { if (o instanceof Collection) return unwrapPortablesIfNeeded((Collection<Object>)o, false); + else if (o instanceof Map) + return unwrapPortablesIfNeeded((Map<Object, Object>)o, false); else if (o instanceof PortableObject) return ((PortableObject)o).deserialize(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java index 8f63358..fd25eac 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java @@ -299,11 +299,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { " 'true' [cacheName=" + cc.getName() + ']'); } - IgniteConfiguration cfg = ctx.config(); + IgniteDeploymentMode depMode = c.getDeploymentMode(); - IgniteDeploymentMode depMode = cfg.getDeploymentMode(); - - if (cfg.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) && + if (c.isPeerClassLoadingEnabled() && (depMode == PRIVATE || depMode == ISOLATED) && !CU.isSystemCache(cc.getName())) throw new GridException("Cannot start cache in PRIVATE or ISOLATED deployment mode: " + ctx.config().getDeploymentMode()); @@ -399,6 +397,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cc.getAtomicityMode() == ATOMIC) assertParameter(cc.getTransactionManagerLookupClassName() == null, "transaction manager can not be used with ATOMIC cache"); + + if (cc.isPortableEnabled() && !ctx.isEnterprise()) + throw new GridException("Portable mode for cache is supported only in Enterprise edition " + + "(set 'portableEnabled' property to 'false') [cacheName=" + cc.getName() + ']'); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java index 3fdbb37..c65ab7e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java @@ -14,6 +14,7 @@ import org.apache.ignite.lifecycle.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.store.*; +import org.gridgain.grid.kernal.processors.interop.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -34,12 +35,16 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { /** */ private final boolean locStore; + /** */ + private boolean convertPortable; + /** * @param store Store. */ @SuppressWarnings("unchecked") public GridCacheStoreManager(@Nullable GridCacheStore<K, Object> store) { this.store = store; + singleThreadGate = store == null ? null : new GridCacheStoreBalancingWrapper<>(store); if (store instanceof GridCacheWriteBehindStore) @@ -61,6 +66,18 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { ((LifecycleAware)store).start(); } } + + if (!cctx.config().isKeepPortableInStore()) { + if (cctx.config().isPortableEnabled()) { + if (store instanceof GridInteropAware) + ((GridInteropAware)store).configure(true); + else + convertPortable = true; + } + else + U.warn(log, "GridCacheConfiguration.isKeepPortableInStore() configuration property will " + + "be ignored because portable mode is not enabled for cache: " + cctx.namex()); + } } /** {@inheritDoc} */ @@ -107,13 +124,16 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { */ @Nullable public V loadFromStore(@Nullable GridCacheTx tx, K key) throws GridException { if (store != null) { - if (log.isDebugEnabled()) - log.debug("Loading value from store for key: " + key); - if (key instanceof GridCacheInternal) // Never load internal keys from store as they are never persisted. return null; + if (convertPortable) + key = (K)cctx.unwrapPortableIfNeeded(key, false); + + if (log.isDebugEnabled()) + log.debug("Loading value from store for key: " + key); + V val = null; try { @@ -126,7 +146,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Loaded value from store [key=" + key + ", val=" + val + ']'); - return val; + return cctx.portableEnabled() ? (V)cctx.marshalToPortable(val) : val; } return null; @@ -164,9 +184,6 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { public boolean loadAllFromStore(@Nullable GridCacheTx tx, Collection<? extends K> keys, final IgniteBiInClosure<K, V> vis) throws GridException { if (store != null) { - if (log.isDebugEnabled()) - log.debug("Loading values from store for keys: " + keys); - if (!keys.isEmpty()) { if (keys.size() == 1) { K key = F.first(keys); @@ -176,10 +193,30 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { return true; } + Collection<? extends K> keys0; + + keys0 = convertPortable ? + F.viewReadOnly(keys, new C1<K, K>() { + @Override public K apply(K k) { + return (K)cctx.unwrapPortableIfNeeded(k, false); + } + }) : + keys; + + if (log.isDebugEnabled()) + log.debug("Loading values from store for keys: " + keys0); + try { - singleThreadGate.loadAll(tx, keys, new CI2<K, Object>() { - @Override public void apply(K k, Object v) { - vis.apply(k, convert(v)); + singleThreadGate.loadAll(tx, keys0, new CI2<K, Object>() { + @Override public void apply(K k, Object o) { + V v = convert(o); + + if (cctx.portableEnabled()) { + k = (K)cctx.marshalToPortable(k); + v = (V)cctx.marshalToPortable(v); + } + + vis.apply(k, v); } }); } @@ -189,10 +226,10 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { catch (GridRuntimeException e) { throw U.cast(e); } - } - if (log.isDebugEnabled()) - log.debug("Loaded values from store for keys: " + keys); + if (log.isDebugEnabled()) + log.debug("Loaded values from store for keys: " + keys0); + } return true; } @@ -265,20 +302,23 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { public boolean putToStore(@Nullable GridCacheTx tx, K key, V val, GridCacheVersion ver) throws GridException { if (store != null) { + // Never persist internal keys. + if (key instanceof GridCacheInternal) + return true; + + if (convertPortable) { + key = (K)cctx.unwrapPortableIfNeeded(key, false); + val = (V)cctx.unwrapPortableIfNeeded(val, false); + } + if (log.isDebugEnabled()) log.debug("Storing value in cache store [key=" + key + ", val=" + val + ']'); - if (key instanceof GridCacheInternal) { - // Never persist internal keys. - return true; + try { + store.put(tx, key, locStore ? F.t(val, ver) : val); } - else { - try { - store.put(tx, key, locStore ? F.t(val, ver) : val); - } - catch (ClassCastException e) { - handleClassCastException(e); - } + catch (ClassCastException e) { + handleClassCastException(e); } if (log.isDebugEnabled()) @@ -298,8 +338,8 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { * @return {@code True} if there is a persistent storage. * @throws GridException If storage failed. */ - public boolean putAllToStore(@Nullable GridCacheTx tx, - Map<K, IgniteBiTuple<V, GridCacheVersion>> map) throws GridException { + public boolean putAllToStore(@Nullable GridCacheTx tx, Map<K, IgniteBiTuple<V, GridCacheVersion>> map) + throws GridException { if (F.isEmpty(map)) return true; @@ -310,15 +350,30 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { } else { if (store != null) { + Map<K, IgniteBiTuple<V, GridCacheVersion>> map0; + + if (convertPortable) { + map0 = U.newHashMap(map.size()); + + for (Map.Entry<K, IgniteBiTuple<V, GridCacheVersion>> e : map.entrySet()) { + IgniteBiTuple<V, GridCacheVersion> t = e.getValue(); + + map0.put((K)cctx.unwrapPortableIfNeeded(e.getKey(), false), + F.t((V)cctx.unwrapPortableIfNeeded(t.get1(), false), t.get2())); + } + } + else + map0 = map; + if (log.isDebugEnabled()) - log.debug("Storing values in cache store [map=" + map + ']'); + log.debug("Storing values in cache store [map=" + map0 + ']'); try { - store.putAll(tx, locStore ? map : F.viewReadOnly(map, + store.putAll(tx, locStore ? map0 : F.viewReadOnly(map0, new C1<IgniteBiTuple<V, GridCacheVersion>, Object>() { - @Override public Object apply(IgniteBiTuple<V, GridCacheVersion> t) { - return t.get1(); - } + @Override public Object apply(IgniteBiTuple<V, GridCacheVersion> t) { + return t.get1(); + } })); } catch (ClassCastException e) { @@ -326,7 +381,7 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { } if (log.isDebugEnabled()) - log.debug("Stored value in cache store [map=" + map + ']'); + log.debug("Stored value in cache store [map=" + map0 + ']'); return true; } @@ -343,12 +398,16 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { */ public boolean removeFromStore(@Nullable GridCacheTx tx, K key) throws GridException { if (store != null) { + // Never remove internal key from store as it is never persisted. + if (key instanceof GridCacheInternal) + return false; + + if (convertPortable) + key = (K)cctx.unwrapPortableIfNeeded(key, false); + if (log.isDebugEnabled()) log.debug("Removing value from cache store [key=" + key + ']'); - if (key instanceof GridCacheInternal) - // Never remove internal key from store as it is never persisted. - return false; else { try { store.remove(tx, key); @@ -384,18 +443,28 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K, V> { } if (store != null) { + Collection<? extends K> keys0; + + keys0 = convertPortable ? + F.viewReadOnly(keys, new C1<K, K>() { + @Override public K apply(K k) { + return (K)cctx.unwrapPortableIfNeeded(k, false); + } + }) : + keys; + if (log.isDebugEnabled()) - log.debug("Removing values from cache store [keys=" + keys + ']'); + log.debug("Removing values from cache store [keys=" + keys0 + ']'); try { - store.removeAll(tx, keys); + store.removeAll(tx, keys0); } catch (ClassCastException e) { handleClassCastException(e); } if (log.isDebugEnabled()) - log.debug("Removed values from cache store [keys=" + keys + ']'); + log.debug("Removed values from cache store [keys=" + keys0 + ']'); return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java index 93ed43f..73151d5 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java @@ -1051,8 +1051,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K if (val != null) { V val0 = val; - if (cacheCtx.portableEnabled() && deserializePortable && val instanceof PortableObject) - val0 = ((PortableObject)val).deserialize(); + if (cacheCtx.portableEnabled()) + val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); map.put(key, val0); } @@ -1090,8 +1090,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K V val0 = val; - if (cacheCtx.portableEnabled() && deserializePortable && val instanceof PortableObject) - val0 = ((PortableObject)val).deserialize(); + if (cacheCtx.portableEnabled()) + val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); map.put(key, val0); } @@ -1156,8 +1156,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K if (val != null) { V val0 = val; - if (cacheCtx.portableEnabled() && deserializePortable && val instanceof PortableObject) - val0 = ((PortableObject)val).deserialize(); + if (cacheCtx.portableEnabled()) + val0 = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); map.put(key, val0); } @@ -1510,9 +1510,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K val = clos.apply(val); } - if (cacheCtx.portableEnabled() && deserializePortable && - val instanceof PortableObject) - val = ((PortableObject)val).deserialize(); + if (cacheCtx.portableEnabled()) + val = (V)cacheCtx.unwrapPortableIfNeeded(val, !deserializePortable); retMap.put(key, val); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java index b39d725..589187d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheWriteBehindStore.java @@ -280,6 +280,11 @@ public class GridCacheWriteBehindStore<K, V> implements GridCacheStore<K, V>, Li } /** {@inheritDoc} */ + @Override public void configure(Object... params) { + // No-op. + } + + /** {@inheritDoc} */ @Override public void initialize(GridKernalContext ctx) throws GridException { if (store instanceof GridInteropAware) ((GridInteropAware)store).initialize(ctx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java index e649265..6db3540 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -261,7 +261,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @return Future for local get. */ @SuppressWarnings( {"unchecked", "IfMayBeConditional"}) - private IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { + private IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> getAsync(final LinkedHashMap<? extends K, Boolean> keys) { if (F.isEmpty(keys)) return new GridFinishedFuture<Collection<GridCacheEntryInfo<K, V>>>(cctx.kernalContext(), Collections.<GridCacheEntryInfo<K, V>>emptyList()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index d342bb3..c755a86 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -12,16 +12,15 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.apache.ignite.portables.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; import org.gridgain.grid.util.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; @@ -416,8 +415,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M colocated.removeIfObsolete(key); } else { - if (cctx.portableEnabled() && deserializePortable && v instanceof PortableObject) - v = ((PortableObject)v).deserialize(); + if (cctx.portableEnabled()) + v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); locVals.put(key, v); @@ -497,12 +496,15 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M for (GridCacheEntryInfo<K, V> info : infos) { info.unmarshalValue(cctx, cctx.deploy().globalLoader()); + K key = info.key(); V val = info.value(); - if (cctx.portableEnabled() && deserializePortable && val instanceof PortableObject) - val = ((PortableObject)val).deserialize(); + if (cctx.portableEnabled()) { + key = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); + val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable); + } - map.put(info.key(), val); + map.put(key, val); } return map; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index a6bc796..8e1954c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -11,7 +11,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.apache.ignite.portables.*; +import org.apache.ignite.plugin.security.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.managers.communication.*; @@ -21,7 +21,6 @@ import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; import org.gridgain.grid.kernal.processors.cache.dr.*; import org.gridgain.grid.kernal.processors.timeout.*; -import org.apache.ignite.plugin.security.*; import org.gridgain.grid.util.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; @@ -743,8 +742,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { success = false; } else { - if (ctx.portableEnabled() && deserializePortable && v instanceof PortableObject) - v = ((PortableObject)v).deserialize(); + if (ctx.portableEnabled() && deserializePortable) { + key = (K)ctx.unwrapPortableIfNeeded(key, false); + v = (V)ctx.unwrapPortableIfNeeded(v, false); + } locVals.put(key, v); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 96d837c..59dddd7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -11,18 +11,17 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht.colocated; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.apache.ignite.portables.*; +import org.apache.ignite.plugin.security.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; -import org.apache.ignite.plugin.security.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; @@ -261,8 +260,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte success = false; } else { - if (ctx.portableEnabled() && deserializePortable && v instanceof PortableObject) - v = ((PortableObject)v).deserialize(); + if (ctx.portableEnabled()) + v = (V)ctx.unwrapPortableIfNeeded(v, !deserializePortable); locVals.put(key, v); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java index 2050742..61e9631 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearGetFuture.java @@ -12,17 +12,16 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.apache.ignite.portables.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; import org.gridgain.grid.kernal.processors.timeout.*; import org.gridgain.grid.util.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.io.*; @@ -457,8 +456,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } if (v != null && !reload) { - if (cctx.portableEnabled() && deserializePortable && v instanceof PortableObject) - v = ((PortableObject)v).deserialize(); + if (cctx.portableEnabled()) + v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); add(new GridFinishedFuture<>(cctx.kernalContext(), Collections.singletonMap(key, v))); } @@ -587,8 +586,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma V val = info.value(); - if (cctx.portableEnabled() && deserializePortable && val instanceof PortableObject) - val = ((PortableObject)val).deserialize(); + if (cctx.portableEnabled()) + val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable); map.put(info.key(), val); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropAware.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropAware.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropAware.java index 1663345..ee2fbe9 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropAware.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/interop/GridInteropAware.java @@ -17,6 +17,13 @@ import org.gridgain.grid.kernal.*; */ public interface GridInteropAware { /** + * Sets configuration parameters. + * + * @param params Configuration parameters. + */ + public void configure(Object... params); + + /** * Initializes interop-aware component. * * @param ctx Context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/641a7488/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java index 8891c1e..d9ff4ec 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/dataload/GridDataLoaderImplSelfTest.java @@ -13,11 +13,10 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.portables.*; -import org.gridgain.grid.cache.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.cache.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.gridgain.testframework.junits.common.*; @@ -42,9 +41,6 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { /** Started grid counter. */ private static int cnt; - /** Flag indicating should be cache configured with portables or not. */ - private static boolean portables; - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -54,15 +50,6 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { cfg.setDiscoverySpi(discoSpi); - if (portables) { - PortableConfiguration portableCfg = new PortableConfiguration(); - - portableCfg.setTypeConfigurations(Arrays.asList( - new PortableTypeConfiguration(TestObject.class.getName()))); - - cfg.setPortableConfiguration(portableCfg); - } - // Forth node goes without cache. if (cnt < 4) cfg.setCacheConfiguration(cacheConfiguration()); @@ -128,7 +115,6 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { public void testAddDataFromMap() throws Exception { try { cnt = 0; - portables = false; startGrids(2); @@ -170,71 +156,10 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { } /** - * Data loader should correctly load portable entries from HashMap in case of grids with more than one node - * and with GridOptimizedMarshaller that requires serializable. + * Gets cache configuration. * - * @throws Exception If failed. + * @return Cache configuration. */ - public void testAddPortableDataFromMap() throws Exception { - try { - cnt = 0; - portables = true; - - startGrids(2); - - Ignite g0 = grid(0); - - IgniteMarshaller marsh = g0.configuration().getMarshaller(); - - if (marsh instanceof IgniteOptimizedMarshaller) - assertTrue(((IgniteOptimizedMarshaller)marsh).isRequireSerializable()); - else - fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName()); - - IgniteDataLoader<Integer, TestObject> dataLdr = g0.dataLoader(null); - - Map<Integer, TestObject> map = U.newHashMap(KEYS_COUNT); - - for (int i = 0; i < KEYS_COUNT; i ++) - map.put(i, new TestObject(i)); - - dataLdr.addData(map); - - dataLdr.close(); - - Random rnd = new Random(); - - GridCache<Integer, TestObject> c = g0.cache(null); - - for (int i = 0; i < 100; i ++) { - Integer k = rnd.nextInt(KEYS_COUNT); - - TestObject v = c.get(k); - - assertEquals(k, v.val()); - } - - GridCacheProjection<Integer, TestObject> c2 = c.keepPortable(); - - for (int i = 0; i < 100; i ++) { - Integer k = rnd.nextInt(KEYS_COUNT); - - TestObject v = c2.get(k); - - assertEquals(k, v.val()); - } - - } - finally { - G.stopAll(true); - } - } - - /** - * Gets cache configuration. - * - * @return Cache configuration. - */ private GridCacheConfiguration cacheConfiguration() { GridCacheConfiguration cacheCfg = defaultCacheConfiguration(); @@ -242,16 +167,13 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { cacheCfg.setBackups(1); cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - if (portables) - cacheCfg.setPortableEnabled(true); - return cacheCfg; } /** * */ - private static class TestObject implements PortableMarshalAware, Serializable { + private static class TestObject implements Serializable { /** */ private int val; @@ -281,15 +203,5 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest { @Override public boolean equals(Object obj) { return obj instanceof TestObject && ((TestObject)obj).val == val; } - - /** {@inheritDoc} */ - @Override public void writePortable(PortableWriter writer) throws PortableException { - writer.writeInt("val", val); - } - - /** {@inheritDoc} */ - @Override public void readPortable(PortableReader reader) throws PortableException { - val = reader.readInt("val"); - } } }