Repository: incubator-ignite Updated Branches: refs/heads/ignite-45 65d22d586 -> c83d5a160
# Bug fix: GridCacheProcessor.publicCache() doesn't return dynamic created caches. Also GridCacheProcessor was simplified. (cherry picked from commit f08510a) Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c83d5a16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c83d5a16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c83d5a16 Branch: refs/heads/ignite-45 Commit: c83d5a160b5599c34baa6814210fc7abbb344db1 Parents: 65d22d5 Author: sevdokimov <[email protected]> Authored: Thu Mar 12 14:29:22 2015 +0300 Committer: sevdokimov <[email protected]> Committed: Thu Mar 12 15:56:43 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 53 +++++------ .../processors/cache/IgniteCacheProxy.java | 94 ++++++++++---------- 2 files changed, 75 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c83d5a16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 758a4f8..0dd69cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -85,13 +85,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { private final Map<String, GridCacheAdapter<?, ?>> caches; /** Map of proxies. */ - private final Map<String, GridCache<?, ?>> proxies; - - /** Map of proxies. */ private final Map<String, IgniteCacheProxy<?, ?>> jCacheProxies; /** Map of public proxies, i.e. proxies which could be returned to the user. */ - private final Map<String, GridCache<?, ?>> publicProxies; + private volatile List<GridCache<?, ?>> publicProxies; /** Map of preload finish futures grouped by preload order. */ private final NavigableMap<Integer, IgniteInternalFuture<?>> preloadFuts; @@ -127,8 +124,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { super(ctx); caches = new LinkedHashMap<>(); - proxies = new HashMap<>(); - publicProxies = new HashMap<>(); jCacheProxies = new HashMap<>(); preloadFuts = new TreeMap<>(); @@ -639,8 +634,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { GridCacheAdapter cache = e.getValue(); - proxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); - jCacheProxies.put(e.getKey(), new IgniteCacheProxy(cache.context(), cache, null, false)); } @@ -696,8 +689,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { startCache(cache); - proxies.put(name, new GridCacheProxyImpl(ctx, cache, null)); - jCacheProxies.put(name, new IgniteCacheProxy(ctx, cache, null, false)); } } @@ -708,14 +699,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { } }); - // Internal caches which should not be returned to user. - for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { - GridCacheAdapter cache = e.getValue(); - - if (!sysCaches.contains(e.getKey())) - publicProxies.put(e.getKey(), new GridCacheProxyImpl(cache.context(), cache, null)); - } - // Must call onKernalStart on shared managers after creation of fetched caches. for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) mgr.onKernalStart(); @@ -769,9 +752,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; - for (String cacheName : stopSeq) { + for (String cacheName : stopSeq) stopCache(caches.get(cacheName), cancel); - } List<? extends GridCacheSharedManager<?, ?>> mgrs = sharedCtx.managers(); @@ -1991,14 +1973,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Getting cache for name: " + name); - return (GridCache<K, V>)proxies.get(name); + IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + + return jcache == null ? null : jcache.legacyProxy(); } /** * @return All configured cache instances. */ public Collection<GridCache<?, ?>> caches() { - return proxies.values(); + return F.viewReadOnly(jCacheProxies.values(), new IgniteClosure<IgniteCacheProxy<?, ?>, GridCache<?, ?>>() { + @Override public GridCache<?, ?> apply(IgniteCacheProxy<?, ?> entries) { + return entries.legacyProxy(); + } + }); } /** @@ -2060,12 +2048,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (sysCaches.contains(name)) throw new IllegalStateException("Failed to get cache because it is system cache: " + name); - GridCache<K, V> cache = (GridCache<K, V>)publicProxies.get(name); + IgniteCacheProxy<K, V> jcache = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); - if (cache == null) + if (jcache == null) throw new IllegalArgumentException("Cache is not configured: " + name); - return cache; + return jcache.legacyProxy(); } /** @@ -2129,7 +2117,20 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return All configured public cache instances. */ public Collection<GridCache<?, ?>> publicCaches() { - return publicProxies.values(); + List<GridCache<?, ?>> res = publicProxies; + + if (res == null) { + res = new ArrayList<>(jCacheProxies.size()); + + for (IgniteCacheProxy<?, ?> proxy : jCacheProxies.values()) { + if (!sysCaches.contains(proxy.getName())) + res.add(proxy.legacyProxy()); + } + + publicProxies = res; + } + + return res; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c83d5a16/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 45093cc..17af9e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -63,6 +63,10 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** Projection. */ private GridCacheProjectionImpl<K, V> prj; + /** */ + @GridToStringExclude + private GridCacheProxyImpl<K, V> legacyProxy; + /** * Empty constructor required for {@link Externalizable}. */ @@ -92,6 +96,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V this.prj = prj; gate = ctx.gate(); + + legacyProxy = new GridCacheProxyImpl<K, V>(ctx, delegate, prj); } /** @@ -258,8 +264,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } - private IgniteBiPredicate<K,V> acceptAll() { - return new IgniteBiPredicate<K,V>() { + private IgniteBiPredicate<K, V> acceptAll() { + return new IgniteBiPredicate<K, V>() { @Override public boolean apply(K k, V v) { return true; } @@ -272,12 +278,12 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @return Cursor. */ @SuppressWarnings("unchecked") - private QueryCursor<Entry<K,V>> doQuery(Query filter, @Nullable ClusterGroup grp) { - final CacheQuery<Map.Entry<K,V>> qry; - final CacheQueryFuture<Map.Entry<K,V>> fut; + private QueryCursor<Entry<K, V>> doQuery(Query filter, @Nullable ClusterGroup grp) { + final CacheQuery<Map.Entry<K, V>> qry; + final CacheQueryFuture<Map.Entry<K, V>> fut; if (filter instanceof ScanQuery) { - IgniteBiPredicate<K,V> p = ((ScanQuery)filter).getFilter(); + IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); qry = delegate.queries().createScanQuery(p != null ? p : acceptAll()); @@ -323,8 +329,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V else throw new IgniteException("Unsupported query predicate: " + filter); - return new QueryCursorImpl<>(new ClIter<Map.Entry<K,V>,Cache.Entry<K,V>>(fut) { - @Override protected Cache.Entry<K,V> convert(Map.Entry<K,V> e) { + return new QueryCursorImpl<>(new ClIter<Map.Entry<K, V>, Cache.Entry<K, V>>(fut) { + @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { return new CacheEntryImpl<>(e.getKey(), e.getValue()); } }); @@ -337,7 +343,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @param loc Local flag. * @return Initial iteration cursor. */ - private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) { + private QueryCursor<Entry<K, V>> queryContinuous(ContinuousQuery<K, V> qry, boolean loc) { if (qry.getInitialQuery() instanceof ContinuousQuery) throw new IgniteException("Initial predicate for continuous query can't be an instance of another " + "continuous query. Use SCAN or SQL query for initial iteration."); @@ -398,7 +404,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public QueryCursor<Entry<K,V>> query(Query qry) { + @Override public QueryCursor<Entry<K, V>> query(Query qry) { A.notNull(qry, "qry"); GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -431,7 +437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { validate(qry); - CacheQuery<List<?>> q = ((GridCacheQueriesEx<K,V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(), false); + CacheQuery<List<?>> q = ((GridCacheQueriesEx<K, V>)delegate.queries()).createSqlFieldsQuery(qry.getSql(), false); if (qry.getPageSize() > 0) q.pageSize(qry.getPageSize()); @@ -459,8 +465,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V * @param p Query. * @return Cursor. */ - private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) { - return new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal( + private QueryCursor<Entry<K, V>> doLocalQuery(SqlQuery p) { + return new QueryCursorImpl<>(ctx.kernalContext().query().<K, V>queryLocal( ctx.name(), p.getType(), p.getSql(), p.getArgs())); } @@ -486,7 +492,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public QueryCursor<Entry<K,V>> localQuery(Query qry) { + @Override public QueryCursor<Entry<K, V>> localQuery(Query qry) { A.notNull(qry, "qry"); GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -498,7 +504,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return doLocalQuery((SqlQuery)qry); if (qry instanceof ContinuousQuery) - return queryContinuous((ContinuousQuery<K,V>)qry, true); + return queryContinuous((ContinuousQuery<K, V>)qry, true); return doQuery(qry, projection(true)); } @@ -1144,8 +1150,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, - EntryProcessor<K, V, T> entryProcessor, - Object... args) { + EntryProcessor<K, V, T> entryProcessor, + Object... args) { try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -1169,8 +1175,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** {@inheritDoc} */ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, - IgniteEntryProcessor<K, V, T> entryProcessor, - Object... args) { + IgniteEntryProcessor<K, V, T> entryProcessor, + Object... args) { try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -1317,39 +1323,25 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * Creates projection that will operate with portable objects. - * <p> - * Projection returned by this method will force cache not to deserialize portable objects, - * so keys and values will be returned from cache API methods without changes. Therefore, - * signature of the projection can contain only following types: - * <ul> - * <li>{@code PortableObject} for portable classes</li> - * <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li> - * <li>Arrays of primitives (byte[], int[], ...)</li> - * <li>{@link String} and array of {@link String}s</li> - * <li>{@link UUID} and array of {@link UUID}s</li> - * <li>{@link Date} and array of {@link Date}s</li> - * <li>{@link java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> - * <li>Enums and array of enums</li> - * <li> - * Maps, collections and array of objects (but objects inside - * them will still be converted if they are portable) - * </li> - * </ul> - * <p> - * For example, if you use {@link Integer} as a key and {@code Value} class as a value - * (which will be stored in portable format), you should acquire following projection - * to avoid deserialization: + * Creates projection that will operate with portable objects. <p> Projection returned by this method will force + * cache not to deserialize portable objects, so keys and values will be returned from cache API methods without + * changes. Therefore, signature of the projection can contain only following types: <ul> <li>{@code PortableObject} + * for portable classes</li> <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li> + * <li>Arrays of primitives (byte[], int[], ...)</li> <li>{@link String} and array of {@link String}s</li> + * <li>{@link UUID} and array of {@link UUID}s</li> <li>{@link Date} and array of {@link Date}s</li> <li>{@link + * java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> <li>Enums and array of enums</li> <li> Maps, + * collections and array of objects (but objects inside them will still be converted if they are portable) </li> + * </ul> <p> For example, if you use {@link Integer} as a key and {@code Value} class as a value (which will be + * stored in portable format), you should acquire following projection to avoid deserialization: * <pre> * CacheProjection<Integer, GridPortableObject> prj = cache.keepPortable(); * * // Value is not deserialized and returned in portable format. * GridPortableObject po = prj.get(1); * </pre> - * <p> - * Note that this method makes sense only if cache is working in portable mode - * ({@code CacheConfiguration#isPortableEnabled()} returns {@code true}. If not, - * this method is no-op and will return current projection. + * <p> Note that this method makes sense only if cache is working in portable mode ({@code + * CacheConfiguration#isPortableEnabled()} returns {@code true}. If not, this method is no-op and will return + * current projection. * * @return Projection for portable objects. */ @@ -1433,6 +1425,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V curFut.set(new IgniteFutureImpl<>(fut)); } + /** + * @return Legacy proxy. + */ + @NotNull + public GridCacheProxyImpl<K, V> legacyProxy() { + return legacyProxy; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(ctx); @@ -1452,6 +1452,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V prj = (GridCacheProjectionImpl<K, V>)in.readObject(); gate = ctx.gate(); + + legacyProxy = new GridCacheProxyImpl<K, V>(ctx, delegate, prj); } /** {@inheritDoc} */
