ignite-2560 Support resource injection for entry processor, optimizations for resource injection.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9ff97c9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9ff97c9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9ff97c9 Branch: refs/heads/master Commit: f9ff97c91374dcd9cd8ad08d46d1d2de44193060 Parents: 407071e Author: Andrey V. Mashenkov <[email protected]> Authored: Tue Aug 30 09:31:20 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Aug 30 09:32:23 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheLazyEntry.java | 2 + .../EntryProcessorResourceInjectorProxy.java | 105 +++++ .../processors/cache/GridCacheMapEntry.java | 13 +- .../GridNearAtomicSingleUpdateFuture.java | 17 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 8 +- .../local/atomic/GridLocalAtomicCache.java | 18 +- .../transactions/IgniteTxLocalAdapter.java | 5 +- .../processors/resource/GridResourceIoc.java | 438 +++++++++++++++---- .../resource/GridResourceProcessor.java | 396 +++++++---------- .../cache/GridCacheAbstractFullApiSelfTest.java | 404 +++++++++++++++-- .../cache/GridCacheAbstractSelfTest.java | 140 +++++- .../GridCacheTransformEventSelfTest.java | 66 ++- ...ePartitionedBasicStoreMultiNodeSelfTest.java | 2 + .../GridTransformSpringInjectionSelfTest.java | 186 ++++++++ .../testsuites/IgniteSpringTestSuite.java | 7 +- .../IgniteInvokeWithInjectionBenchmark.java | 74 ++++ .../IgniteInvokeWithInjectionTxBenchmark.java | 30 ++ 17 files changed, 1515 insertions(+), 396 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java index c8cfc99..02cccc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java @@ -191,6 +191,8 @@ public class CacheLazyEntry<K, V> extends CacheInterceptorEntry<K, V> { @Override public <T> T unwrap(Class<T> cls) { if (cls.isAssignableFrom(Ignite.class)) return (T)cctx.kernalContext().grid(); + else if (cls.isAssignableFrom(GridCacheContext.class)) + return (T)cctx; else if (cls.isAssignableFrom(getClass())) return cls.cast(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java new file mode 100644 index 0000000..76b2511 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EntryProcessorResourceInjectorProxy.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.resource.GridResourceIoc; +import org.apache.ignite.internal.processors.resource.GridResourceProcessor; +import org.jetbrains.annotations.Nullable; + +/** + * Entry processor wrapper injecting Ignite resources into target processor before execution. + */ +public class EntryProcessorResourceInjectorProxy<K, V, T> implements EntryProcessor<K, V, T>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Delegate. */ + private EntryProcessor<K, V, T> delegate; + + /** Injected flag. */ + private transient boolean injected; + + /** + * @param delegate Delegate. + */ + private EntryProcessorResourceInjectorProxy(EntryProcessor<K, V, T> delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException { + if (!injected) { + GridCacheContext cctx = entry.unwrap(GridCacheContext.class); + + GridResourceProcessor rsrc = cctx.kernalContext().resource(); + + try { + rsrc.inject(delegate, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, cctx.name()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + injected = true; + } + + return delegate.process(entry, arguments); + } + + /** + * @return Delegate entry processor. + */ + public EntryProcessor<K, V, T> delegate() { + return delegate; + } + + /** + * Wraps EntryProcessor if needed. + * + * @param ctx Context. + * @param proc Entry proc. + * @return Wrapped entry proc if wrapping is needed. + */ + public static <K, V, T> EntryProcessor<K, V, T> wrap(GridKernalContext ctx, + @Nullable EntryProcessor<K, V, T> proc) { + if (proc == null || proc instanceof EntryProcessorResourceInjectorProxy) + return proc; + + GridResourceProcessor rsrcProcessor = ctx.resource(); + + return rsrcProcessor.isAnnotationsPresent(null, proc, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR) ? + new EntryProcessorResourceInjectorProxy<>(proc) : proc; + } + + /** + * Unwraps EntryProcessor as Object if needed. + * + * @param obj Entry processor. + * @return Unwrapped entry processor. + */ + static Object unwrap(Object obj) { + return (obj instanceof EntryProcessorResourceInjectorProxy) ? ((EntryProcessorResourceInjectorProxy)obj).delegate() : obj; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f692bf4..c760ac1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -896,6 +896,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo); + cctx.events().addEvent( partition(), key, @@ -1004,7 +1006,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme deletedUnlocked(false); } - if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) + if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { + transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo); + cctx.events().addEvent( partition(), key, @@ -1019,6 +1023,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme transformClo != null ? transformClo.getClass().getName() : null, taskName, keepBinary); + } } } @@ -1685,7 +1690,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Calculate new value. if (op == GridCacheOperation.TRANSFORM) { - transformCloClsName = writeObj.getClass().getName(); + transformCloClsName = EntryProcessorResourceInjectorProxy.unwrap(writeObj).getClass().getName(); EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; @@ -2463,6 +2468,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { evtOld = cctx.unwrapTemporary(oldVal); + transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo); + cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName, @@ -2553,6 +2560,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) { evtOld = cctx.unwrapTemporary(oldVal); + transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo); + cctx.events().addEvent(partition(), key, evtNodeId, null, newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld, evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName, http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 661a178..256c7ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -17,6 +17,13 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; @@ -26,6 +33,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -43,13 +51,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; -import javax.cache.expiry.ExpiryPolicy; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.UUID; - import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -549,6 +550,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda if (op != TRANSFORM) val = cctx.toCacheObject(val); + else + val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 2432f63..30a0c3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; +import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -800,7 +802,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu val = conflictPutVal.valueEx(); conflictVer = conflictPutVal.version(); - conflictTtl = conflictPutVal.ttl(); + conflictTtl = conflictPutVal.ttl(); conflictExpireTime = conflictPutVal.expireTime(); } else if (conflictRmvVals != null) { @@ -826,6 +828,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (op != TRANSFORM) val = cctx.toCacheObject(val); + else + val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); List<ClusterNode> affNodes = mapKey(cacheKey, topVer); @@ -940,6 +944,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (op != TRANSFORM) val = cctx.toCacheObject(val); + else + val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val); ClusterNode primary = cctx.affinity().primary(cacheKey.partition(), topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index ac08f8f..a419887 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -57,6 +57,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.resource.GridResourceIoc; +import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; @@ -432,7 +434,6 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { needVer); } - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @@ -511,7 +512,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { entry = swapOrOffheap ? entryEx(cacheKey) : peekEx(cacheKey); if (entry != null) { - CacheObject v ; + CacheObject v; GridCacheVersion ver; if (needVer) { @@ -541,7 +542,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { deserializeBinary, true, ver); - }else + } + else success = false; } else { @@ -944,6 +946,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (op == UPDATE) val = ctx.toCacheObject(val); + else if (op == TRANSFORM) + ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name()); while (true) { GridCacheEntryEx entry = null; @@ -1014,7 +1018,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { if (err != null) throw err; - Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, keepBinary, res.get2(), res.get1()) : + Object ret = res == null ? null : rawRetval ? new GridCacheReturn(ctx, true, keepBinary, res.get2(), res.get1()) : (retval || op == TRANSFORM) ? res.get2() : res.get1(); if (op == TRANSFORM && ret == null) @@ -1035,8 +1039,8 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { * @param filter Optional filter. * @param subjId Subject ID. * @param taskName Task name. - * @throws CachePartialUpdateCheckedException If update failed. * @return Results map for invoke operation. + * @throws CachePartialUpdateCheckedException If update failed. */ @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"}) private Map<K, EntryProcessorResult> updateWithBatch( @@ -1101,6 +1105,10 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { } if (op == TRANSFORM) { + ctx.kernalContext().resource().inject(val, + GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, + ctx.name()); + EntryProcessor<Object, Object, Object> entryProcessor = (EntryProcessor<Object, Object, Object>)val; http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 9ad7fb0..ee992cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheOperationContext; +import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -65,7 +66,6 @@ import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -89,6 +89,7 @@ import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; @@ -3664,7 +3665,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig this, op, val, - entryProcessor, + EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), entryProcessor), invokeArgs, hasDrTtl ? drTtl : -1L, entry, http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java index 35824fa..0158973 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java @@ -21,12 +21,12 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.util.GridLeanIdentitySet; @@ -35,6 +35,17 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.CacheNameResource; +import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.JobContextResource; +import org.apache.ignite.resources.LoadBalancerResource; +import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.resources.ServiceResource; +import org.apache.ignite.resources.SpringApplicationContextResource; +import org.apache.ignite.resources.SpringResource; +import org.apache.ignite.resources.TaskContinuousMapperResource; +import org.apache.ignite.resources.TaskSessionResource; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -42,17 +53,12 @@ import org.jsr166.ConcurrentHashMap8; * Resource container contains caches for classes used for injection. * Caches used to improve the efficiency of standard Java reflection mechanism. */ -class GridResourceIoc { +public class GridResourceIoc { /** Task class resource mapping. Used to efficiently cleanup resources related to class loader. */ - private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap = - new ConcurrentHashMap8<>(); + private final ConcurrentMap<ClassLoader, Set<Class<?>>> taskMap = new ConcurrentHashMap8<>(); /** Class descriptors cache. */ - private final ConcurrentMap<Class<?>, ClassDescriptor> clsDescs = new ConcurrentHashMap8<>(); - - /** */ - private final ConcurrentMap<Class<?>, Class<? extends Annotation>[]> annCache = - new ConcurrentHashMap8<>(); + private AtomicReference<Map<Class<?>, ClassDescriptor>> clsDescs = new AtomicReference<>(); /** * @param ldr Class loader. @@ -61,8 +67,22 @@ class GridResourceIoc { Set<Class<?>> clss = taskMap.remove(ldr); if (clss != null) { - clsDescs.keySet().removeAll(clss); - annCache.keySet().removeAll(clss); + Map<Class<?>, ClassDescriptor> newMap, oldMap; + + do { + oldMap = clsDescs.get(); + + if (oldMap == null) + break; + + newMap = new HashMap<>(oldMap.size() - clss.size()); + + for (Map.Entry<Class<?>, ClassDescriptor> entry : oldMap.entrySet()) { + if (!clss.contains(entry.getKey())) + newMap.put(entry.getKey(), entry.getValue()); + } + } + while (!clsDescs.compareAndSet(oldMap, newMap)); } } @@ -71,8 +91,8 @@ class GridResourceIoc { */ void undeployAll() { taskMap.clear(); - clsDescs.clear(); - annCache.clear(); + + clsDescs.set(null); } /** @@ -83,8 +103,8 @@ class GridResourceIoc { * @param injector Resource to inject. * @param dep Deployment. * @param depCls Deployment class. - * @throws IgniteCheckedException Thrown in case of any errors during injection. * @return {@code True} if resource was injected. + * @throws IgniteCheckedException Thrown in case of any errors during injection. */ @SuppressWarnings("SimplifiableIfStatement") boolean inject(Object target, @@ -92,26 +112,41 @@ class GridResourceIoc { GridResourceInjector injector, @Nullable GridDeployment dep, @Nullable Class<?> depCls) - throws IgniteCheckedException - { + throws IgniteCheckedException { return injectInternal(target, annCls, injector, dep, depCls, null); } /** + * @param dep Deployment. * @param cls Class. + * @return Descriptor. */ - private ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) { - ClassDescriptor res = clsDescs.get(cls); + ClassDescriptor descriptor(@Nullable GridDeployment dep, Class<?> cls) { + Map<Class<?>, ClassDescriptor> newMap, oldMap; + ClassDescriptor res, newDesc = null; + + do { + oldMap = clsDescs.get(); + + if (oldMap != null && (res = oldMap.get(cls)) != null) + break; - if (res == null) { if (dep != null) { Set<Class<?>> classes = F.addIfAbsent(taskMap, dep.classLoader(), F.<Class<?>>newCSet()); classes.add(cls); + + dep = null; } - res = F.addIfAbsent(clsDescs, cls, new ClassDescriptor(cls)); + if (oldMap == null) + newMap = new HashMap<>(); + else + (newMap = new HashMap<>(oldMap.size() + 1)).putAll(oldMap); + + newMap.put(cls, res = newDesc == null ? (newDesc = new ClassDescriptor(cls)) : newDesc); } + while (!clsDescs.compareAndSet(oldMap, newMap)); return res; } @@ -123,8 +158,8 @@ class GridResourceIoc { * @param dep Deployment. * @param depCls Deployment class. * @param checkedObjs Set of already inspected objects to avoid indefinite recursion. - * @throws IgniteCheckedException Thrown in case of any errors during injection. * @return {@code True} if resource was injected. + * @throws IgniteCheckedException Thrown in case of any errors during injection. */ private boolean injectInternal(Object target, Class<? extends Annotation> annCls, @@ -132,56 +167,14 @@ class GridResourceIoc { @Nullable GridDeployment dep, @Nullable Class<?> depCls, @Nullable Set<Object> checkedObjs) - throws IgniteCheckedException - { + throws IgniteCheckedException { Class<?> targetCls = target.getClass(); ClassDescriptor descr = descriptor(dep, targetCls); T2<GridResourceField[], GridResourceMethod[]> annotatedMembers = descr.annotatedMembers(annCls); - if (descr.recursiveFields().length == 0 && annotatedMembers == null) - return false; - - if (checkedObjs == null && descr.recursiveFields().length > 0) - checkedObjs = new GridLeanIdentitySet<>(); - - if (checkedObjs != null && !checkedObjs.add(target)) - return false; - - boolean injected = false; - - for (Field field : descr.recursiveFields()) { - try { - Object obj = field.get(target); - - if (obj != null) { - assert checkedObjs != null; - - injected |= injectInternal(obj, annCls, injector, dep, depCls, checkedObjs); - } - } - catch (IllegalAccessException e) { - throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() + - ", target=" + target + ']', e); - } - } - - if (annotatedMembers != null) { - for (GridResourceField field : annotatedMembers.get1()) { - injector.inject(field, target, depCls, dep); - - injected = true; - } - - for (GridResourceMethod mtd : annotatedMembers.get2()) { - injector.inject(mtd, target, depCls, dep); - - injected = true; - } - } - - return injected; + return descr.injectInternal(target, annCls, annotatedMembers, injector, dep, depCls, checkedObjs); } /** @@ -202,36 +195,18 @@ class GridResourceIoc { } /** + * Checks if annotation is presented on a field or method of the specified object. + * + * @param target Target object. + * @param annSet Annotation classes to find on fields or methods of target object. * @param dep Deployment. - * @param target Target. - * @param annClss Annotations. - * @return Filtered set of annotations that present in target. + * @return {@code true} if any annotation is presented, {@code false} if it's not. */ - @SuppressWarnings({"SuspiciousToArrayCall", "unchecked"}) - Class<? extends Annotation>[] filter( - @Nullable GridDeployment dep, Object target, - Collection<Class<? extends Annotation>> annClss) { + boolean isAnnotationsPresent(@Nullable GridDeployment dep, Object target, AnnotationSet annSet) { assert target != null; - assert annClss != null && !annClss.isEmpty(); + assert annSet != null; - Class<?> cls = target.getClass(); - - Class<? extends Annotation>[] res = annCache.get(cls); - - if (res == null) { - Collection<Class<? extends Annotation>> res0 = new ArrayList<>(); - - for (Class<? extends Annotation> annCls : annClss) { - if (isAnnotationPresent(target, annCls, dep)) - res0.add(annCls); - } - - res = res0.toArray(new Class[res0.size()]); - - annCache.putIfAbsent(cls, res); - } - - return res; + return descriptor(dep, target.getClass()).isAnnotated(annSet) != 0; } /** @@ -251,16 +226,18 @@ class GridResourceIoc { return t2 == null ? GridResourceMethod.EMPTY_ARRAY : t2.get2(); } - /** {@inheritDoc} */ + /** Print memory statistics */ public void printMemoryStats() { X.println(">>> taskMapSize: " + taskMap.size()); - X.println(">>> classDescriptorsCacheSize: " + clsDescs.size()); + + Map<Class<?>, ClassDescriptor> map = clsDescs.get(); + X.println(">>> classDescriptorsCacheSize: " + (map == null ? 0 : map.size())); } /** * */ - private static class ClassDescriptor { + class ClassDescriptor { /** */ private final Field[] recursiveFields; @@ -268,8 +245,18 @@ class GridResourceIoc { private final Map<Class<? extends Annotation>, T2<GridResourceField[], GridResourceMethod[]>> annMap; /** + * Uses as enum-map with enum {@link AnnotationSet} member as key, + * and bitmap as a result of matching found annotations with enum set {@link ResourceAnnotation} as value. + */ + private final int[] containsAnnSets; + + /** Uses as enum-map with enum {@link ResourceAnnotation} member as a keys. */ + private final T2<GridResourceField[], GridResourceMethod[]>[] annArr; + + /** * @param cls Class. */ + @SuppressWarnings("unchecked") ClassDescriptor(Class<?> cls) { Map<Class<? extends Annotation>, T2<List<GridResourceField>, List<GridResourceMethod>>> annMap = new HashMap<>(); @@ -335,20 +322,277 @@ class GridResourceIoc { this.annMap.put(entry.getKey(), new T2<>(fields, mtds)); } + + T2<GridResourceField[], GridResourceMethod[]>[] annArr = null; + + if (annMap.isEmpty()) + containsAnnSets = null; + else { + int annotationsBits = 0; + + for (ResourceAnnotation ann : ResourceAnnotation.values()) { + T2<GridResourceField[], GridResourceMethod[]> member = annotatedMembers(ann.clazz); + + if (member != null) { + if (annArr == null) + annArr = new T2[ResourceAnnotation.values().length]; + + annArr[ann.ordinal()] = member; + + annotationsBits |= 1 << ann.ordinal(); + } + } + + AnnotationSet[] annotationSets = AnnotationSet.values(); + + containsAnnSets = new int[annotationSets.length]; + + for (int i = 0; i < annotationSets.length; i++) + containsAnnSets[i] = annotationsBits & annotationSets[i].annotationsBitSet; + } + + this.annArr = annArr; } /** * @return Recursive fields. */ - public Field[] recursiveFields() { + Field[] recursiveFields() { return recursiveFields; } /** + * @param annCls Annotation class. * @return Fields. */ - @Nullable public T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) { + @Nullable T2<GridResourceField[], GridResourceMethod[]> annotatedMembers(Class<? extends Annotation> annCls) { return annMap.get(annCls); } + + /** + * @param set annotation set. + * @return {@code Bitmask} > 0 if any annotation is presented, otherwise return 0; + */ + int isAnnotated(AnnotationSet set) { + return recursiveFields.length > 0 ? set.annotationsBitSet : + (containsAnnSets == null ? 0 : containsAnnSets[set.ordinal()]); + } + + /** + * @param ann Annotation. + * @return {@code True} if annotation is presented. + */ + boolean isAnnotated(ResourceAnnotation ann) { + return recursiveFields.length > 0 || (annArr != null && annArr[ann.ordinal()] != null); + } + + /** + * @param target Target object. + * @param annCls Annotation class. + * @param annotatedMembers Setter annotation. + * @param injector Resource to inject. + * @param dep Deployment. + * @param depCls Deployment class. + * @param checkedObjs Set of already inspected objects to avoid indefinite recursion. + * @return {@code True} if resource was injected. + * @throws IgniteCheckedException Thrown in case of any errors during injection. + */ + boolean injectInternal(Object target, + Class<? extends Annotation> annCls, + T2<GridResourceField[], GridResourceMethod[]> annotatedMembers, + GridResourceInjector injector, + @Nullable GridDeployment dep, + @Nullable Class<?> depCls, + @Nullable Set<Object> checkedObjs) + throws IgniteCheckedException { + if (recursiveFields.length == 0 && annotatedMembers == null) + return false; + + if (checkedObjs == null && recursiveFields.length > 0) + checkedObjs = new GridLeanIdentitySet<>(); + + if (checkedObjs != null && !checkedObjs.add(target)) + return false; + + boolean injected = false; + + for (Field field : recursiveFields) { + try { + Object obj = field.get(target); + + if (obj != null) { + assert checkedObjs != null; + + ClassDescriptor desc = descriptor(dep, obj.getClass()); + injected |= desc.injectInternal(obj, annCls, desc.annotatedMembers(annCls), + injector, dep, depCls, checkedObjs); + } + } + catch (IllegalAccessException e) { + throw new IgniteCheckedException("Failed to inject resource [field=" + field.getName() + + ", target=" + target + ']', e); + } + } + + if (annotatedMembers != null) { + for (GridResourceField field : annotatedMembers.get1()) { + injector.inject(field, target, depCls, dep); + + injected = true; + } + + for (GridResourceMethod mtd : annotatedMembers.get2()) { + injector.inject(mtd, target, depCls, dep); + + injected = true; + } + } + + return injected; + } + + /** + * @param target Target object. + * @param ann Setter annotation. + * @param injector Resource to inject. + * @param dep Deployment. + * @param depCls Deployment class. + * @return {@code True} if resource was injected. + * @throws IgniteCheckedException Thrown in case of any errors during injection. + */ + public boolean inject(Object target, + ResourceAnnotation ann, + GridResourceInjector injector, + @Nullable GridDeployment dep, + @Nullable Class<?> depCls) + throws IgniteCheckedException { + return injectInternal(target, + ann.clazz, + annArr == null ? null : annArr[ann.ordinal()], + injector, + dep, + depCls, + null); + } + } + + /** + * + */ + enum ResourceAnnotation { + /** */ + CACHE_NAME(CacheNameResource.class), + + /** */ + SPRING_APPLICATION_CONTEXT(SpringApplicationContextResource.class), + + /** */ + SPRING(SpringResource.class), + + /** */ + IGNITE_INSTANCE(IgniteInstanceResource.class), + + /** */ + LOGGER(LoggerResource.class), + + /** */ + SERVICE(ServiceResource.class), + + /** */ + TASK_SESSION(TaskSessionResource.class), + + /** */ + LOAD_BALANCER(LoadBalancerResource.class), + + /** */ + TASK_CONTINUOUS_MAPPER(TaskContinuousMapperResource.class), + + /** */ + JOB_CONTEXT(JobContextResource.class), + + /** */ + CACHE_STORE_SESSION(CacheStoreSessionResource.class); + + /** */ + public final Class<? extends Annotation> clazz; + + /** + * @param clazz annotation class. + */ + ResourceAnnotation(Class<? extends Annotation> clazz) { + this.clazz = clazz; + } + } + + /** + * + */ + public enum AnnotationSet { + /** */ + GENERIC( + ResourceAnnotation.SPRING_APPLICATION_CONTEXT, + ResourceAnnotation.SPRING, + ResourceAnnotation.IGNITE_INSTANCE, + ResourceAnnotation.LOGGER, + ResourceAnnotation.SERVICE + ), + + /** */ + ENTRY_PROCESSOR( + ResourceAnnotation.CACHE_NAME, + + ResourceAnnotation.SPRING_APPLICATION_CONTEXT, + ResourceAnnotation.SPRING, + ResourceAnnotation.IGNITE_INSTANCE, + ResourceAnnotation.LOGGER, + ResourceAnnotation.SERVICE + ), + + /** */ + TASK( + ResourceAnnotation.TASK_SESSION, + ResourceAnnotation.LOAD_BALANCER, + ResourceAnnotation.TASK_CONTINUOUS_MAPPER, + + ResourceAnnotation.SPRING_APPLICATION_CONTEXT, + ResourceAnnotation.SPRING, + ResourceAnnotation.IGNITE_INSTANCE, + ResourceAnnotation.LOGGER, + ResourceAnnotation.SERVICE + ), + + /** */ + JOB( + ResourceAnnotation.TASK_SESSION, + ResourceAnnotation.JOB_CONTEXT, + + ResourceAnnotation.SPRING_APPLICATION_CONTEXT, + ResourceAnnotation.SPRING, + ResourceAnnotation.IGNITE_INSTANCE, + ResourceAnnotation.LOGGER, + ResourceAnnotation.SERVICE + ); + + /** Resource annotations bits for fast checks. */ + public final int annotationsBitSet; + + /** Holds annotations in order */ + public final ResourceAnnotation[] annotations; + + /** + * @param annotations ResourceAnnotations. + */ + AnnotationSet(ResourceAnnotation... annotations) { + assert annotations.length < 32 : annotations.length; + + this.annotations = annotations; + + int mask = 0; + + for (ResourceAnnotation ann : annotations) + mask |= 1 << ann.ordinal(); + + annotationsBitSet = mask; + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f9ff97c9/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java index afe0ef1..84d07b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java @@ -20,12 +20,11 @@ package org.apache.ignite.internal.processors.resource; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Arrays; import java.util.Collection; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.CacheStoreSession; import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobContext; import org.apache.ignite.compute.ComputeLoadBalancer; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.compute.ComputeTaskContinuousMapper; @@ -34,22 +33,10 @@ import org.apache.ignite.internal.GridInternalWrapper; import org.apache.ignite.internal.GridJobContextImpl; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTaskSessionImpl; -import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.lifecycle.LifecycleBean; -import org.apache.ignite.resources.CacheNameResource; -import org.apache.ignite.resources.CacheStoreSessionResource; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.JobContextResource; -import org.apache.ignite.resources.LoadBalancerResource; -import org.apache.ignite.resources.LoggerResource; -import org.apache.ignite.resources.ServiceResource; -import org.apache.ignite.resources.SpringApplicationContextResource; -import org.apache.ignite.resources.SpringResource; -import org.apache.ignite.resources.TaskContinuousMapperResource; -import org.apache.ignite.resources.TaskSessionResource; import org.apache.ignite.services.Service; import org.apache.ignite.spi.IgniteSpi; import org.jetbrains.annotations.Nullable; @@ -58,42 +45,6 @@ import org.jetbrains.annotations.Nullable; * Processor for all Ignite and task/job resources. */ public class GridResourceProcessor extends GridProcessorAdapter { - /** */ - private static final Collection<Class<? extends Annotation>> JOB_INJECTIONS = Arrays.asList( - TaskSessionResource.class, - JobContextResource.class, - IgniteInstanceResource.class, - SpringApplicationContextResource.class, - SpringResource.class, - LoggerResource.class, - ServiceResource.class); - - /** */ - private static final Collection<Class<? extends Annotation>> TASK_INJECTIONS = Arrays.asList( - TaskSessionResource.class, - LoadBalancerResource.class, - TaskContinuousMapperResource.class, - IgniteInstanceResource.class, - SpringApplicationContextResource.class, - SpringResource.class, - LoggerResource.class, - ServiceResource.class); - - /** Grid instance injector. */ - private GridResourceBasicInjector<IgniteEx> gridInjector; - - /** Spring application context injector. */ - private GridResourceInjector springCtxInjector; - - /** Logger injector. */ - private GridResourceBasicInjector<IgniteLogger> logInjector; - - /** Services injector. */ - private GridResourceBasicInjector<Collection<Service>> srvcInjector; - - /** Spring bean resources injector. */ - private GridResourceInjector springBeanInjector; - /** Cleaning injector. */ private final GridResourceInjector nullInjector = new GridResourceBasicInjector<>(null); @@ -103,6 +54,9 @@ public class GridResourceProcessor extends GridProcessorAdapter { /** */ private final GridResourceIoc ioc = new GridResourceIoc(); + /** */ + private final GridResourceInjector[] injectorByAnnotation; + /** * Creates resources processor. * @@ -111,9 +65,14 @@ public class GridResourceProcessor extends GridProcessorAdapter { public GridResourceProcessor(GridKernalContext ctx) { super(ctx); - gridInjector = new GridResourceBasicInjector<>(ctx.grid()); - logInjector = new GridResourceLoggerInjector(ctx.config().getGridLogger()); - srvcInjector = new GridResourceServiceInjector(ctx.grid()); + injectorByAnnotation = new GridResourceInjector[GridResourceIoc.ResourceAnnotation.values().length]; + + injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SERVICE.ordinal()] = + new GridResourceServiceInjector(ctx.grid()); + injectorByAnnotation[GridResourceIoc.ResourceAnnotation.LOGGER.ordinal()] = + new GridResourceLoggerInjector(ctx.config().getGridLogger()); + injectorByAnnotation[GridResourceIoc.ResourceAnnotation.IGNITE_INSTANCE.ordinal()] = + new GridResourceBasicInjector<>(ctx.grid()); } /** {@inheritDoc} */ @@ -138,8 +97,12 @@ public class GridResourceProcessor extends GridProcessorAdapter { public void setSpringContext(@Nullable GridSpringResourceContext rsrcCtx) { this.rsrcCtx = rsrcCtx; - springCtxInjector = rsrcCtx != null ? rsrcCtx.springContextInjector() : nullInjector; - springBeanInjector = rsrcCtx != null ? rsrcCtx.springBeanInjector() : nullInjector; + GridResourceInjector springCtxInjector = rsrcCtx != null ? rsrcCtx.springContextInjector() : nullInjector; + GridResourceInjector springBeanInjector = rsrcCtx != null ? rsrcCtx.springBeanInjector() : nullInjector; + + injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING.ordinal()] = springBeanInjector; + injectorByAnnotation[GridResourceIoc.ResourceAnnotation.SPRING_APPLICATION_CONTEXT.ordinal()] = + springCtxInjector; } /** @@ -187,17 +150,15 @@ public class GridResourceProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ public void inject(GridDeployment dep, Class<?> depCls, Object target) throws IgniteCheckedException { + assert target != null; + if (log.isDebugEnabled()) log.debug("Injecting resources: " + target); // Unwrap Proxy object. target = unwrapTarget(target); - ioc.inject(target, IgniteInstanceResource.class, gridInjector, dep, depCls); - ioc.inject(target, SpringApplicationContextResource.class, springCtxInjector, dep, depCls); - ioc.inject(target, SpringResource.class, springBeanInjector, dep, depCls); - ioc.inject(target, LoggerResource.class, logInjector, dep, depCls); - ioc.inject(target, ServiceResource.class, srvcInjector, dep, depCls); + inject(target, GridResourceIoc.AnnotationSet.GENERIC, dep, depCls); } /** @@ -216,7 +177,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { // Unwrap Proxy object. obj = unwrapTarget(obj); - ioc.inject(obj, CacheNameResource.class, new GridResourceBasicInjector<>(cacheName), null, null); + inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_NAME, null, null, cacheName); } /** @@ -236,7 +197,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { // Unwrap Proxy object. obj = unwrapTarget(obj); - return ioc.inject(obj, CacheStoreSessionResource.class, new GridResourceBasicInjector<>(ses), null, null); + return inject(obj, GridResourceIoc.ResourceAnnotation.CACHE_STORE_SESSION, null, null, ses); } /** @@ -244,6 +205,17 @@ public class GridResourceProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed to inject. */ public void injectGeneric(Object obj) throws IgniteCheckedException { + inject(obj, GridResourceIoc.AnnotationSet.GENERIC); + } + + /** + * @param obj Object to inject. + * @param annSet Supported annotations. + * @param params Parameters. + * @throws IgniteCheckedException If failed to inject. + */ + public void inject(Object obj, GridResourceIoc.AnnotationSet annSet, Object... params) + throws IgniteCheckedException { assert obj != null; if (log.isDebugEnabled()) @@ -252,33 +224,126 @@ public class GridResourceProcessor extends GridProcessorAdapter { // Unwrap Proxy object. obj = unwrapTarget(obj); - // No deployment for lifecycle beans. - ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null); - ioc.inject(obj, SpringResource.class, springBeanInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null); - ioc.inject(obj, LoggerResource.class, logInjector, null, null); - ioc.inject(obj, ServiceResource.class, srvcInjector, null, null); + inject(obj, annSet, null, null, params); + } + + /** + * @param obj Object to inject. + * @param annSet Supported annotations. + * @param dep Deployment. + * @param depCls Deployment class. + * @param params Parameters. + * @throws IgniteCheckedException If failed to inject. + */ + private void inject(Object obj, + GridResourceIoc.AnnotationSet annSet, + @Nullable GridDeployment dep, + @Nullable Class<?> depCls, + Object... params) + throws IgniteCheckedException { + GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass()); + + assert clsDesc != null; + + if (clsDesc.isAnnotated(annSet) == 0) + return; + + int i = 0; + for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations) { + if (clsDesc.isAnnotated(ann)) { + final GridResourceInjector injector = injectorByAnnotation(ann, i < params.length ? params[i] : null); + + if (injector != null) + clsDesc.inject(obj, ann, injector, dep, depCls); + } + + i++; + } } /** * @param obj Object. + * @param annSet Supported annotations. * @throws IgniteCheckedException If failed. */ - public void cleanupGeneric(Object obj) throws IgniteCheckedException { - if (obj != null) { - if (log.isDebugEnabled()) - log.debug("Cleaning up resources: " + obj); - - // Unwrap Proxy object. - obj = unwrapTarget(obj); - - // Caching key is null for the life-cycle beans. - ioc.inject(obj, LoggerResource.class, nullInjector, null, null); - ioc.inject(obj, ServiceResource.class, nullInjector, null, null); - ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null); - ioc.inject(obj, SpringResource.class, nullInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null); + private void cleanup(Object obj, GridResourceIoc.AnnotationSet annSet) + throws IgniteCheckedException { + assert obj != null; + + if (log.isDebugEnabled()) + log.debug("Cleaning up resources: " + obj); + + // Unwrap Proxy object. + obj = unwrapTarget(obj); + + GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass()); + + assert clsDesc != null; + + if (clsDesc.isAnnotated(annSet) == 0) + return; + + for (GridResourceIoc.ResourceAnnotation ann : annSet.annotations) + clsDesc.inject(obj, ann, nullInjector, null, null); + } + + /** + * @param ann Annotation. + * @param param Injector parameter. + * @return Injector. + */ + private GridResourceInjector injectorByAnnotation(GridResourceIoc.ResourceAnnotation ann, Object param) { + final GridResourceInjector res; + + switch (ann) { + case CACHE_NAME: + case TASK_SESSION: + case LOAD_BALANCER: + case TASK_CONTINUOUS_MAPPER: + case CACHE_STORE_SESSION: + res = new GridResourceBasicInjector<>(param); + break; + + case JOB_CONTEXT: + res = new GridResourceJobContextInjector((ComputeJobContext)param); + break; + + default: + res = injectorByAnnotation[ann.ordinal()]; + break; } + + return res; + } + + /** + * @param obj Object to inject. + * @throws IgniteCheckedException If failed to inject. + */ + private boolean inject(Object obj, GridResourceIoc.ResourceAnnotation ann, @Nullable GridDeployment dep, + @Nullable Class<?> depCls, Object param) + throws IgniteCheckedException { + GridResourceIoc.ClassDescriptor clsDesc = ioc.descriptor(null, obj.getClass()); + + assert clsDesc != null; + + if (clsDesc.isAnnotated(ann)) { + GridResourceInjector injector = injectorByAnnotation(ann, param); + + if (injector != null) + return clsDesc.inject(obj, ann, injector, dep, depCls); + } + + return false; + } + + /** + * @param obj Object. + * @throws IgniteCheckedException If failed. + */ + public void cleanupGeneric(Object obj) throws IgniteCheckedException { + if (obj != null) + cleanup(obj, GridResourceIoc.AnnotationSet.GENERIC); } /** @@ -321,30 +386,8 @@ public class GridResourceProcessor extends GridProcessorAdapter { */ private void injectToJob(GridDeployment dep, Class<?> taskCls, Object job, ComputeTaskSession ses, GridJobContextImpl jobCtx) throws IgniteCheckedException { - Class<? extends Annotation>[] filtered = ioc.filter(dep, job, JOB_INJECTIONS); - - if (filtered.length > 0) { - for (Class<? extends Annotation> annCls : filtered) { - if (annCls == TaskSessionResource.class) - injectBasicResource(job, TaskSessionResource.class, ses, dep, taskCls); - else if (annCls == JobContextResource.class) - ioc.inject(job, JobContextResource.class, new GridResourceJobContextInjector(jobCtx), - dep, taskCls); - else if (annCls == IgniteInstanceResource.class) - ioc.inject(job, IgniteInstanceResource.class, gridInjector, dep, taskCls); - else if (annCls == SpringApplicationContextResource.class) - ioc.inject(job, SpringApplicationContextResource.class, springCtxInjector, dep, taskCls); - else if (annCls == SpringResource.class) - ioc.inject(job, SpringResource.class, springBeanInjector, dep, taskCls); - else if (annCls == LoggerResource.class) - ioc.inject(job, LoggerResource.class, logInjector, dep, taskCls); - else { - assert annCls == ServiceResource.class; - - ioc.inject(job, ServiceResource.class, srvcInjector, dep, taskCls); - } - } - } + + inject(job, GridResourceIoc.AnnotationSet.JOB, dep, taskCls, ses, jobCtx); } /** @@ -365,34 +408,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { // Unwrap Proxy object. Object obj = unwrapTarget(task); - Class<? extends Annotation>[] filtered = ioc.filter(dep, obj, TASK_INJECTIONS); - - if (filtered.length == 0) - return; - - Class<?> taskCls = obj.getClass(); - - for (Class<? extends Annotation> annCls : filtered) { - if (annCls == TaskSessionResource.class) - injectBasicResource(obj, TaskSessionResource.class, ses, dep, taskCls); - else if (annCls == LoadBalancerResource.class) - injectBasicResource(obj, LoadBalancerResource.class, balancer, dep, taskCls); - else if (annCls == TaskContinuousMapperResource.class) - injectBasicResource(obj, TaskContinuousMapperResource.class, mapper, dep, taskCls); - else if (annCls == IgniteInstanceResource.class) - ioc.inject(obj, IgniteInstanceResource.class, gridInjector, dep, taskCls); - else if (annCls == SpringApplicationContextResource.class) - ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, dep, taskCls); - else if (annCls == SpringResource.class) - ioc.inject(obj, SpringResource.class, springBeanInjector, dep, taskCls); - else if (annCls == LoggerResource.class) - ioc.inject(obj, LoggerResource.class, logInjector, dep, taskCls); - else { - assert annCls == ServiceResource.class; - - ioc.inject(obj, ServiceResource.class, srvcInjector, dep, taskCls); - } - } + inject(obj, GridResourceIoc.AnnotationSet.TASK, dep, null, ses, balancer, mapper); } /** @@ -408,24 +424,25 @@ public class GridResourceProcessor extends GridProcessorAdapter { } /** + * Checks if annotations presents in specified object. + * + * @param dep Class deployment. + * @param target Object to check. + * @param annSet Annotations to find. + * @return {@code true} if any annotation is presented, {@code false} if it's not. + */ + public boolean isAnnotationsPresent(GridDeployment dep, Object target, GridResourceIoc.AnnotationSet annSet) { + return ioc.isAnnotationsPresent(dep, target, annSet); + } + + /** * Injects held resources into given SPI implementation. * * @param spi SPI implementation. * @throws IgniteCheckedException Throw in case of any errors. */ public void inject(IgniteSpi spi) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Injecting resources: " + spi); - - // Unwrap Proxy object. - Object obj = unwrapTarget(spi); - - // Caching key is null for the SPIs. - ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null); - ioc.inject(obj, SpringResource.class, springBeanInjector, null, null); - ioc.inject(obj, LoggerResource.class, logInjector, null, null); - ioc.inject(obj, ServiceResource.class, srvcInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null); + injectGeneric(spi); } /** @@ -436,17 +453,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ public void cleanup(IgniteSpi spi) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Cleaning up resources: " + spi); - - // Unwrap Proxy object. - Object obj = unwrapTarget(spi); - - ioc.inject(obj, LoggerResource.class, nullInjector, null, null); - ioc.inject(obj, ServiceResource.class, nullInjector, null, null); - ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null); - ioc.inject(obj, SpringResource.class, nullInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null); + cleanupGeneric(spi); } /** @@ -456,18 +463,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ public void inject(LifecycleBean lifecycleBean) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Injecting resources: " + lifecycleBean); - - // Unwrap Proxy object. - Object obj = unwrapTarget(lifecycleBean); - - // No deployment for lifecycle beans. - ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null); - ioc.inject(obj, SpringResource.class, springBeanInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null); - ioc.inject(obj, LoggerResource.class, logInjector, null, null); - ioc.inject(obj, ServiceResource.class, srvcInjector, null, null); + injectGeneric(lifecycleBean); } /** @@ -478,18 +474,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ public void cleanup(LifecycleBean lifecycleBean) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Cleaning up resources: " + lifecycleBean); - - // Unwrap Proxy object. - Object obj = unwrapTarget(lifecycleBean); - - // Caching key is null for the life-cycle beans. - ioc.inject(obj, LoggerResource.class, nullInjector, null, null); - ioc.inject(obj, ServiceResource.class, nullInjector, null, null); - ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null); - ioc.inject(obj, SpringResource.class, nullInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null); + cleanupGeneric(lifecycleBean); } /** @@ -499,18 +484,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ public void inject(Service svc) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Injecting resources: " + svc); - - // Unwrap Proxy object. - Object obj = unwrapTarget(svc); - - // No deployment for lifecycle beans. - ioc.inject(obj, SpringApplicationContextResource.class, springCtxInjector, null, null); - ioc.inject(obj, SpringResource.class, springBeanInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, gridInjector, null, null); - ioc.inject(obj, LoggerResource.class, logInjector, null, null); - ioc.inject(obj, ServiceResource.class, srvcInjector, null, null); + injectGeneric(svc); } /** @@ -521,39 +495,7 @@ public class GridResourceProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException Thrown in case of any errors. */ public void cleanup(Service svc) throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Cleaning up resources: " + svc); - - // Unwrap Proxy object. - Object obj = unwrapTarget(svc); - - // Caching key is null for the life-cycle beans. - ioc.inject(obj, LoggerResource.class, nullInjector, null, null); - ioc.inject(obj, ServiceResource.class, nullInjector, null, null); - ioc.inject(obj, SpringApplicationContextResource.class, nullInjector, null, null); - ioc.inject(obj, SpringResource.class, nullInjector, null, null); - ioc.inject(obj, IgniteInstanceResource.class, nullInjector, null, null); - } - - /** - * This method is declared public as it is used from tests as well. - * Note, that this method can be used only with unwrapped objects - * (see {@link #unwrapTarget(Object)}). - * - * @param target Target object. - * @param annCls Setter annotation. - * @param rsrc Resource to inject. - * @param dep Deployment. - * @param depCls Deployed class. - * @throws IgniteCheckedException If injection failed. - */ - public void injectBasicResource(Object target, Class<? extends Annotation> annCls, Object rsrc, - GridDeployment dep, Class<?> depCls) throws IgniteCheckedException { - // Safety. - assert !(rsrc instanceof GridResourceInjector) : "Invalid injection."; - - // Basic injection don't cache anything. Use null as a key. - ioc.inject(target, annCls, new GridResourceBasicInjector<>(rsrc), dep, depCls); + cleanupGeneric(svc); } /** @@ -602,4 +544,4 @@ public class GridResourceProcessor extends GridProcessorAdapter { ioc.printMemoryStats(); } -} \ No newline at end of file +}
