IGNITE-3862 - GridServiceProxy invocation never times out. Fix

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eeb2f2a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eeb2f2a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eeb2f2a3

Branch: refs/heads/master
Commit: eeb2f2a38fb792b711eb665e380d14bc00f6e078
Parents: 6d744db
Author: dkarachentsev <[email protected]>
Authored: Mon Dec 12 12:14:01 2016 +0300
Committer: dkarachentsev <[email protected]>
Committed: Mon Dec 12 12:14:01 2016 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteServices.java  |  18 +-
 .../ignite/internal/IgniteServicesImpl.java     |  11 +-
 .../affinity/GridAffinityProcessor.java         |   2 +-
 .../processors/cache/GridCacheAdapter.java      |   6 +-
 .../CacheDataStructuresManager.java             |   6 +-
 .../cache/query/GridCacheQueryManager.java      |   4 +-
 .../closure/GridClosureProcessor.java           |  32 ++-
 .../internal/processors/job/GridJobWorker.java  |   7 +
 .../platform/services/PlatformServices.java     |   2 +-
 .../service/GridServiceProcessor.java           |  11 +-
 .../processors/service/GridServiceProxy.java    |  18 +-
 .../processors/task/GridTaskWorker.java         |   7 +
 .../IgniteComputeTopologyExceptionTest.java     |   5 +-
 ...gniteServiceProxyTimeoutInitializedTest.java | 284 +++++++++++++++++++
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 15 files changed, 390 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
index 08577c5..83fd487 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
@@ -350,6 +350,22 @@ public interface IgniteServices extends IgniteAsyncSupport 
{
      */
     public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean 
sticky) throws IgniteException;
 
+    /**
+     * Gets a remote handle on the service with timeout. If service is 
available locally,
+     * then local instance is returned and timeout ignored, otherwise, a 
remote proxy is dynamically
+     * created and provided for the specified service.
+     *
+     * @param name Service name.
+     * @param svcItf Interface for the service.
+     * @param sticky Whether or not Ignite should always contact the same 
remote
+     *      service or try to load-balance between services.
+     * @param timeout If greater than 0 created proxy will wait for service 
availability only specified time,
+     *  and will limit remote service invocation time.
+     * @return Either proxy over remote service or local service if it is 
deployed locally.
+     * @throws IgniteException If failed to create service proxy.
+     */
+    public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean 
sticky, long timeout) throws IgniteException;
+
     /** {@inheritDoc} */
     @Override public IgniteServices withAsync();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
index b8042c3..400f28d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
@@ -222,14 +222,21 @@ public class IgniteServicesImpl extends 
AsyncSupportAdapter implements IgniteSer
     /** {@inheritDoc} */
     @Override public <T> T serviceProxy(String name, Class<? super T> svcItf, 
boolean sticky)
         throws IgniteException {
+        return (T) serviceProxy(name, svcItf, sticky, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T serviceProxy(final String name, final Class<? super 
T> svcItf, final boolean sticky,
+        final long timeout) throws IgniteException {
         A.notNull(name, "name");
         A.notNull(svcItf, "svcItf");
         A.ensure(svcItf.isInterface(), "Service class must be an interface: " 
+ svcItf);
+        A.ensure(timeout >= 0, "Timeout cannot be negative: " + timeout);
 
         guard();
 
         try {
-            return (T)ctx.service().serviceProxy(prj, name, svcItf, sticky);
+            return (T)ctx.service().serviceProxy(prj, name, svcItf, sticky, 
timeout);
         }
         finally {
             unguard();
@@ -289,4 +296,4 @@ public class IgniteServicesImpl extends AsyncSupportAdapter 
implements IgniteSer
     protected Object readResolve() throws ObjectStreamException {
         return prj.services();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index b9182ae..b6efafb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -497,7 +497,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
     private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, 
AffinityTopologyVersion topVer, ClusterNode n)
         throws IgniteCheckedException {
         GridTuple3<GridAffinityMessage, GridAffinityMessage, 
GridAffinityAssignment> t = ctx.closure()
-            .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), 
F.asList(n), true/*system pool*/).get();
+            .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), 
F.asList(n), true/*system pool*/, 0).get();
 
         AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), 
t.get1());
         AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), 
t.get2());

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index b30ec70..4d59d50 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3628,13 +3628,15 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             return ctx.closures().callAsyncNoFailover(BROADCAST,
                 new LoadKeysCallableV2<>(ctx.name(), keys, update, plc, 
keepBinary),
                 nodes,
-                true);
+                true,
+                0);
         }
         else {
             return ctx.closures().callAsyncNoFailover(BROADCAST,
                 new LoadKeysCallable<>(ctx.name(), keys, update, plc),
                 nodes,
-                true);
+                true,
+                0);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index c018f71..c1983df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -491,7 +491,8 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
                     cctx.closures().callAsyncNoFailover(BROADCAST,
                         new BlockSetCallable(cctx.name(), id),
                         nodes,
-                        true).get();
+                        true,
+                        0).get();
                 }
                 catch (IgniteCheckedException e) {
                     if (e.hasCause(ClusterTopologyCheckedException.class)) {
@@ -514,7 +515,8 @@ public class CacheDataStructuresManager extends 
GridCacheManagerAdapter {
                     cctx.closures().callAsyncNoFailover(BROADCAST,
                         new RemoveSetDataCallable(cctx.name(), id, topVer),
                         nodes,
-                        true).get();
+                        true,
+                        0).get();
                 }
                 catch (IgniteCheckedException e) {
                     if (e.hasCause(ClusterTopologyCheckedException.class)) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index d4decb4..1165157 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -220,7 +220,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
         if (detailMetricsSz > 0)
             detailMetrics = new ConcurrentHashMap8<>(detailMetricsSz);
-        
+
         lsnr = new GridLocalEventListener() {
             @Override public void onEvent(Event evt) {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
@@ -2237,7 +2237,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
             // Get metadata from remote nodes.
             if (!nodes.isEmpty())
-                rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, 
Collections.singleton(job), nodes, true);
+                rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, 
Collections.singleton(job), nodes, true, 0);
 
             // Get local metadata.
             IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = 
cctx.closures().callLocalSafe(job, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 9d295d3..3ed985e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -81,6 +81,7 @@ import static 
org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER;
 import static org.apache.ignite.compute.ComputeJobResultPolicy.REDUCE;
 import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER;
 import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID;
+import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT;
 
 /**
  *
@@ -520,9 +521,15 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
-    public <R> IgniteInternalFuture<R> callAsyncNoFailover(GridClosureCallMode 
mode, @Nullable Callable<R> job,
-        @Nullable Collection<ClusterNode> nodes, boolean sys) {
+    public <R> IgniteInternalFuture<R> callAsyncNoFailover(
+        GridClosureCallMode mode,
+        @Nullable Callable<R> job,
+        @Nullable Collection<ClusterNode> nodes,
+        boolean sys,
+        long timeout
+    ) {
         assert mode != null;
+        assert timeout >= 0 : timeout;
 
         busyLock.readLock();
 
@@ -536,6 +543,9 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             ctx.task().setThreadContext(TC_NO_FAILOVER, true);
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
+            if (timeout > 0)
+                ctx.task().setThreadContext(TC_TIMEOUT, timeout);
+
             return ctx.task().execute(new T7<>(mode, job), null, sys);
         }
         finally {
@@ -548,13 +558,19 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param jobs Closures to execute.
      * @param nodes Grid nodes.
      * @param sys If {@code true}, then system pool will be used.
+     * @param timeout If greater than 0 limits task execution. Cannot be 
negative.
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
-    public <R> IgniteInternalFuture<Collection<R>> 
callAsyncNoFailover(GridClosureCallMode mode,
-        @Nullable Collection<? extends Callable<R>> jobs, @Nullable 
Collection<ClusterNode> nodes,
-        boolean sys) {
+    public <R> IgniteInternalFuture<Collection<R>> callAsyncNoFailover(
+        GridClosureCallMode mode,
+        @Nullable Collection<? extends Callable<R>> jobs,
+        @Nullable Collection<ClusterNode> nodes,
+        boolean sys,
+        long timeout
+    ) {
         assert mode != null;
+        assert timeout >= 0 : timeout;
 
         busyLock.readLock();
 
@@ -568,6 +584,9 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             ctx.task().setThreadContext(TC_NO_FAILOVER, true);
             ctx.task().setThreadContext(TC_SUBGRID, nodes);
 
+            if (timeout > 0)
+                ctx.task().setThreadContext(TC_TIMEOUT, timeout);
+
             return ctx.task().execute(new T6<>(mode, jobs), null, sys);
         }
         finally {
@@ -580,6 +599,7 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
      * @param job Closure to execute.
      * @param nodes Grid nodes.
      * @param sys If {@code true}, then system pool will be used.
+     * @param timeout If greater than 0 limits task execution. Cannot be 
negative.
      * @param <R> Type.
      * @return Grid future for collection of closure results.
      */
@@ -2304,4 +2324,4 @@ public class GridClosureProcessor extends 
GridProcessorAdapter {
             return S.toString(C4MLAV2.class, this, super.toString());
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index f5c6a27..9bee849 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -43,14 +43,17 @@ import org.apache.ignite.internal.GridJobExecuteResponse;
 import org.apache.ignite.internal.GridJobSessionImpl;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import 
org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -587,6 +590,10 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
                         else
                             U.warn(log, msg);
                     }
+                    else if (X.hasCause(e, GridServiceNotFoundException.class) 
||
+                        X.hasCause(e, ClusterTopologyCheckedException.class))
+                        // Should be throttled, because GridServiceProxy 
continuously retry getting service.
+                        LT.error(log, e, "Failed to execute job [jobId=" + 
ses.getJobId() + ", ses=" + ses + ']');
                     else
                         U.error(log, "Failed to execute job [jobId=" + 
ses.getJobId() + ", ses=" + ses + ']', e);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
index 962a4c0..c266986 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -340,7 +340,7 @@ public class PlatformServices extends 
PlatformAbstractTarget {
 
                 Object proxy = 
PlatformService.class.isAssignableFrom(d.serviceClass())
                     ? services.serviceProxy(name, PlatformService.class, 
sticky)
-                    : new GridServiceProxy<>(services.clusterGroup(), name, 
Service.class, sticky,
+                    : new GridServiceProxy<>(services.clusterGroup(), name, 
Service.class, sticky, 0,
                         platformCtx.kernalContext());
 
                 return new ServiceProxyHolder(proxy, d.serviceClass());

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index b9b92b8..3690f35 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -694,9 +694,10 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param name Service name.
+     * @param timeout If greater than 0 limits task execution time. Cannot be 
negative.
      * @return Service topology.
      */
-    public Map<UUID, Integer> serviceTopology(String name) throws 
IgniteCheckedException {
+    public Map<UUID, Integer> serviceTopology(String name, long timeout) 
throws IgniteCheckedException {
         ClusterNode node = cache.affinity().mapKeyToNode(name);
 
         if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) {
@@ -708,7 +709,8 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
                 GridClosureCallMode.BROADCAST,
                 call,
                 Collections.singletonList(node),
-                false
+                false,
+                timeout
             ).get();
         }
         else
@@ -828,12 +830,13 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
      * @param name Service name.
      * @param svcItf Service class.
      * @param sticky Whether multi-node request should be done.
+     * @param timeout If greater than 0 limits service acquire time. Cannot be 
negative.
      * @param <T> Service interface type.
      * @return The proxy of a service by its name and class.
      * @throws IgniteException If failed to create proxy.
      */
     @SuppressWarnings("unchecked")
-    public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> 
svcItf, boolean sticky)
+    public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> 
svcItf, boolean sticky, long timeout)
         throws IgniteException {
         if (hasLocalNode(prj)) {
             ServiceContextImpl ctx = serviceContext(name);
@@ -851,7 +854,7 @@ public class GridServiceProcessor extends 
GridProcessorAdapter {
             }
         }
 
-        return new GridServiceProxy<T>(prj, name, svcItf, sticky, ctx).proxy();
+        return new GridServiceProxy<T>(prj, name, svcItf, sticky, timeout, 
ctx).proxy();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 564a13a..aa60934 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -84,11 +84,15 @@ public class GridServiceProxy<T> implements Serializable {
     /** Whether multi-node request should be done. */
     private final boolean sticky;
 
+    /** Service availability wait timeout. */
+    private final long waitTimeout;
+
     /**
      * @param prj Grid projection.
      * @param name Service name.
      * @param svc Service type class.
      * @param sticky Whether multi-node request should be done.
+     * @param timeout Service availability wait timeout. Cannot be negative.
      * @param ctx Context.
      */
     @SuppressWarnings("unchecked")
@@ -96,12 +100,16 @@ public class GridServiceProxy<T> implements Serializable {
         String name,
         Class<? super T> svc,
         boolean sticky,
+        long timeout,
         GridKernalContext ctx)
     {
+        assert timeout >= 0 : timeout;
+
         this.prj = prj;
         this.ctx = ctx;
         this.name = name;
         this.sticky = sticky;
+        this.waitTimeout = timeout;
         hasLocNode = hasLocalNode(prj);
 
         log = ctx.log(getClass());
@@ -145,6 +153,8 @@ public class GridServiceProxy<T> implements Serializable {
         ctx.gateway().readLock();
 
         try {
+            final long startTime = U.currentTimeMillis();
+
             while (true) {
                 ClusterNode node = null;
 
@@ -171,7 +181,8 @@ public class GridServiceProxy<T> implements Serializable {
                             GridClosureCallMode.BROADCAST,
                             new ServiceProxyCallable(mtd.getName(), name, 
mtd.getParameterTypes(), args),
                             Collections.singleton(node),
-                            false
+                            false,
+                            waitTimeout
                         ).get();
                     }
                 }
@@ -203,6 +214,9 @@ public class GridServiceProxy<T> implements Serializable {
 
                     throw new IgniteException(e);
                 }
+
+                if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= 
waitTimeout)
+                    throw new IgniteException("Service acquire timeout was 
reached, stopping. [timeout=" + waitTimeout + "]");
             }
         }
         finally {
@@ -246,7 +260,7 @@ public class GridServiceProxy<T> implements Serializable {
         if (hasLocNode && ctx.service().service(name) != null)
             return ctx.discovery().localNode();
 
-        Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name);
+        Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, 
waitTimeout);
 
         if (snapshot == null || snapshot.isEmpty())
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 3478c70..d89e80b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -68,6 +68,7 @@ import 
org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.closure.AffinityTask;
+import 
org.apache.ignite.internal.processors.service.GridServiceNotFoundException;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1065,6 +1066,12 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
 
                         return null;
                     }
+                    else if (X.hasCause(e, GridServiceNotFoundException.class) 
||
+                        X.hasCause(e, ClusterTopologyCheckedException.class)) {
+                        // Should be throttled, because GridServiceProxy 
continuously retry getting service.
+                        LT.error(log, e, "Failed to obtain remote job result 
policy for result from " +
+                            "ComputeTask.result(..) method (will fail the 
whole task): " + jobRes);
+                    }
                     else
                         U.error(log, "Failed to obtain remote job result 
policy for result from " +
                             "ComputeTask.result(..) method (will fail the 
whole task): " + jobRes, e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
index c74daca..3ed91e8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java
@@ -88,7 +88,8 @@ public class IgniteComputeTopologyExceptionTest extends 
GridCommonAbstractTest {
                 }
             },
             nodes,
-            false);
+            false,
+            0);
 
         try {
             fut.get();
@@ -99,4 +100,4 @@ public class IgniteComputeTopologyExceptionTest extends 
GridCommonAbstractTest {
             log.info("Expected exception: " + e);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java
new file mode 100644
index 0000000..41eef31
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.service;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryBasicIdMapper;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeTaskTimeoutException;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceConfiguration;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests service proxy timeouts.
+ */
+public class IgniteServiceProxyTimeoutInitializedTest extends 
GridCommonAbstractTest {
+    /** */
+    private static Service srvc;
+
+    /** */
+    private static CountDownLatch latch1;
+
+    /** */
+    private static CountDownLatch latch2;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(final String 
gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        final ServiceConfiguration scfg = new ServiceConfiguration();
+
+        if (gridName.endsWith("0")) {
+            scfg.setName("testService");
+            scfg.setService(srvc);
+            scfg.setMaxPerNodeCount(1);
+            scfg.setTotalCount(1);
+            scfg.setNodeFilter(new NodeFilter());
+
+            final Map<String, String> attrs = new HashMap<>();
+
+            attrs.put("clusterGroup", "0");
+
+            cfg.setUserAttributes(attrs);
+
+            cfg.setServiceConfiguration(scfg);
+        }
+
+        cfg.setMarshaller(null);
+
+        final BinaryConfiguration binCfg = new BinaryConfiguration();
+
+        // Despite defaults explicitly set to lower case.
+        binCfg.setIdMapper(new BinaryBasicIdMapper(true));
+
+        cfg.setBinaryConfiguration(binCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Checks that we limit retries to get not available service by timeout.
+     *
+     * @throws Exception If fail.
+     */
+    @SuppressWarnings({"Convert2Lambda", "ThrowableResultOfMethodCallIgnored"})
+    public void testUnavailableService() throws Exception {
+        srvc = new TestWaitServiceImpl();
+
+        latch1 = new CountDownLatch(1);
+        latch2 = new CountDownLatch(1);
+
+        try {
+            GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    startGrid(0);
+
+                    return null;
+                }
+            });
+
+            assert latch1.await(1, TimeUnit.MINUTES);
+
+            final IgniteEx ignite1 = startGrid(1);
+
+            final TestService testSrvc = 
ignite1.services().serviceProxy("testService", TestService.class, false, 500);
+
+            GridTestUtils.assertThrows(null, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    testSrvc.test();
+
+                    return null;
+                }
+            }, IgniteException.class, null);
+        }
+        finally {
+            latch2.countDown();
+        }
+    }
+
+    /**
+     * Checks that service not hangs if timeout set. Here we get hang with 
marshalling exception.
+     *
+     * @throws Exception If fail.
+     */
+    @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"})
+    public void testServiceException() throws Exception {
+        srvc = new HangServiceImpl();
+
+        // Start service grid.
+        startGrid(0);
+        final IgniteEx ignite1 = startGrid(1);
+
+        final HangService testSrvc = 
ignite1.services().serviceProxy("testService", HangService.class, false, 1_000);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                testSrvc.hang();
+
+                return null;
+            }
+        }, ComputeTaskTimeoutException.class, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60_000;
+    }
+
+    /**
+     *
+     */
+    private static class NodeFilter implements IgnitePredicate<ClusterNode> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(final ClusterNode clusterNode) {
+            return "0".equals(clusterNode.attribute("clusterGroup"));
+        }
+    }
+
+    /**
+     *
+     */
+    private interface TestService {
+        /** */
+        void test();
+    }
+
+    /**
+     *
+     */
+    private static class TestWaitServiceImpl implements Service, TestService {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void test() {
+            // No-op
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel(final ServiceContext ctx) {
+            // No-op
+        }
+
+        /** {@inheritDoc} */
+        @Override public void init(final ServiceContext ctx) throws Exception {
+            latch1.countDown();
+
+            // Simulate long initialization.
+            latch2.await(1, TimeUnit.MINUTES);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(final ServiceContext ctx) throws 
Exception {
+            // No-op
+        }
+    }
+
+    /**
+     *
+     */
+    private static class HangClass implements Binarylizable {
+
+        /** {@inheritDoc} */
+        @Override public void writeBinary(final BinaryWriter writer) throws 
BinaryObjectException {
+            try {
+                U.sleep(10_000);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new BinaryObjectException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readBinary(final BinaryReader reader) throws 
BinaryObjectException {
+            try {
+                U.sleep(10_000);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw new BinaryObjectException(e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private interface HangService {
+        /**
+         * @return Hangs deserialization.
+         */
+        HangClass hang();
+    }
+
+    /**
+     *
+     */
+    private static class HangServiceImpl implements HangService, Service {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public HangClass hang() {
+            return new HangClass();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel(final ServiceContext ctx) {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void init(final ServiceContext ctx) throws Exception {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void execute(final ServiceContext ctx) throws 
Exception {
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index d9cc8c0..350b715 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -66,6 +66,7 @@ import 
org.apache.ignite.internal.processors.service.IgniteServiceDeployment2Cla
 import 
org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingDefaultMarshallerTest;
 import 
org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingJdkMarshallerTest;
 import 
org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest;
+import 
org.apache.ignite.internal.processors.service.IgniteServiceProxyTimeoutInitializedTest;
 import 
org.apache.ignite.internal.processors.service.IgniteServiceReassignmentTest;
 import 
org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
 import 
org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
@@ -141,6 +142,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridServiceProxyNodeStopSelfTest.class);
         suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
         suite.addTestSuite(IgniteServiceReassignmentTest.class);
+        suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
 
         
suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
         
suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);

Reply via email to