IGNITE-3862 - GridServiceProxy invocation never times out. Fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eeb2f2a3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eeb2f2a3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eeb2f2a3 Branch: refs/heads/master Commit: eeb2f2a38fb792b711eb665e380d14bc00f6e078 Parents: 6d744db Author: dkarachentsev <[email protected]> Authored: Mon Dec 12 12:14:01 2016 +0300 Committer: dkarachentsev <[email protected]> Committed: Mon Dec 12 12:14:01 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteServices.java | 18 +- .../ignite/internal/IgniteServicesImpl.java | 11 +- .../affinity/GridAffinityProcessor.java | 2 +- .../processors/cache/GridCacheAdapter.java | 6 +- .../CacheDataStructuresManager.java | 6 +- .../cache/query/GridCacheQueryManager.java | 4 +- .../closure/GridClosureProcessor.java | 32 ++- .../internal/processors/job/GridJobWorker.java | 7 + .../platform/services/PlatformServices.java | 2 +- .../service/GridServiceProcessor.java | 11 +- .../processors/service/GridServiceProxy.java | 18 +- .../processors/task/GridTaskWorker.java | 7 + .../IgniteComputeTopologyExceptionTest.java | 5 +- ...gniteServiceProxyTimeoutInitializedTest.java | 284 +++++++++++++++++++ .../testsuites/IgniteKernalSelfTestSuite.java | 2 + 15 files changed, 390 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/IgniteServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java index 08577c5..83fd487 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java @@ -350,6 +350,22 @@ public interface IgniteServices extends IgniteAsyncSupport { */ public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky) throws IgniteException; + /** + * Gets a remote handle on the service with timeout. If service is available locally, + * then local instance is returned and timeout ignored, otherwise, a remote proxy is dynamically + * created and provided for the specified service. + * + * @param name Service name. + * @param svcItf Interface for the service. + * @param sticky Whether or not Ignite should always contact the same remote + * service or try to load-balance between services. + * @param timeout If greater than 0 created proxy will wait for service availability only specified time, + * and will limit remote service invocation time. + * @return Either proxy over remote service or local service if it is deployed locally. + * @throws IgniteException If failed to create service proxy. + */ + public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout) throws IgniteException; + /** {@inheritDoc} */ @Override public IgniteServices withAsync(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java index b8042c3..400f28d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java @@ -222,14 +222,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer /** {@inheritDoc} */ @Override public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky) throws IgniteException { + return (T) serviceProxy(name, svcItf, sticky, 0); + } + + /** {@inheritDoc} */ + @Override public <T> T serviceProxy(final String name, final Class<? super T> svcItf, final boolean sticky, + final long timeout) throws IgniteException { A.notNull(name, "name"); A.notNull(svcItf, "svcItf"); A.ensure(svcItf.isInterface(), "Service class must be an interface: " + svcItf); + A.ensure(timeout >= 0, "Timeout cannot be negative: " + timeout); guard(); try { - return (T)ctx.service().serviceProxy(prj, name, svcItf, sticky); + return (T)ctx.service().serviceProxy(prj, name, svcItf, sticky, timeout); } finally { unguard(); @@ -289,4 +296,4 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer protected Object readResolve() throws ObjectStreamException { return prj.services(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index b9182ae..b6efafb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -497,7 +497,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { private AffinityInfo affinityInfoFromNode(@Nullable String cacheName, AffinityTopologyVersion topVer, ClusterNode n) throws IgniteCheckedException { GridTuple3<GridAffinityMessage, GridAffinityMessage, GridAffinityAssignment> t = ctx.closure() - .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/).get(); + .callAsyncNoFailover(BROADCAST, affinityJob(cacheName, topVer), F.asList(n), true/*system pool*/, 0).get(); AffinityFunction f = (AffinityFunction)unmarshall(ctx, n.id(), t.get1()); AffinityKeyMapper m = (AffinityKeyMapper)unmarshall(ctx, n.id(), t.get2()); http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index b30ec70..4d59d50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -3628,13 +3628,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return ctx.closures().callAsyncNoFailover(BROADCAST, new LoadKeysCallableV2<>(ctx.name(), keys, update, plc, keepBinary), nodes, - true); + true, + 0); } else { return ctx.closures().callAsyncNoFailover(BROADCAST, new LoadKeysCallable<>(ctx.name(), keys, update, plc), nodes, - true); + true, + 0); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index c018f71..c1983df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -491,7 +491,8 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { cctx.closures().callAsyncNoFailover(BROADCAST, new BlockSetCallable(cctx.name(), id), nodes, - true).get(); + true, + 0).get(); } catch (IgniteCheckedException e) { if (e.hasCause(ClusterTopologyCheckedException.class)) { @@ -514,7 +515,8 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { cctx.closures().callAsyncNoFailover(BROADCAST, new RemoveSetDataCallable(cctx.name(), id, topVer), nodes, - true).get(); + true, + 0).get(); } catch (IgniteCheckedException e) { if (e.hasCause(ClusterTopologyCheckedException.class)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index d4decb4..1165157 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -220,7 +220,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (detailMetricsSz > 0) detailMetrics = new ConcurrentHashMap8<>(detailMetricsSz); - + lsnr = new GridLocalEventListener() { @Override public void onEvent(Event evt) { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); @@ -2237,7 +2237,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // Get metadata from remote nodes. if (!nodes.isEmpty()) - rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, Collections.singleton(job), nodes, true); + rmtFut = cctx.closures().callAsyncNoFailover(BROADCAST, Collections.singleton(job), nodes, true, 0); // Get local metadata. IgniteInternalFuture<Collection<CacheSqlMetadata>> locFut = cctx.closures().callLocalSafe(job, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java index 9d295d3..3ed985e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -81,6 +81,7 @@ import static org.apache.ignite.compute.ComputeJobResultPolicy.FAILOVER; import static org.apache.ignite.compute.ComputeJobResultPolicy.REDUCE; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_TIMEOUT; /** * @@ -520,9 +521,15 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteInternalFuture<R> callAsyncNoFailover(GridClosureCallMode mode, @Nullable Callable<R> job, - @Nullable Collection<ClusterNode> nodes, boolean sys) { + public <R> IgniteInternalFuture<R> callAsyncNoFailover( + GridClosureCallMode mode, + @Nullable Callable<R> job, + @Nullable Collection<ClusterNode> nodes, + boolean sys, + long timeout + ) { assert mode != null; + assert timeout >= 0 : timeout; busyLock.readLock(); @@ -536,6 +543,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_NO_FAILOVER, true); ctx.task().setThreadContext(TC_SUBGRID, nodes); + if (timeout > 0) + ctx.task().setThreadContext(TC_TIMEOUT, timeout); + return ctx.task().execute(new T7<>(mode, job), null, sys); } finally { @@ -548,13 +558,19 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param jobs Closures to execute. * @param nodes Grid nodes. * @param sys If {@code true}, then system pool will be used. + * @param timeout If greater than 0 limits task execution. Cannot be negative. * @param <R> Type. * @return Grid future for collection of closure results. */ - public <R> IgniteInternalFuture<Collection<R>> callAsyncNoFailover(GridClosureCallMode mode, - @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, - boolean sys) { + public <R> IgniteInternalFuture<Collection<R>> callAsyncNoFailover( + GridClosureCallMode mode, + @Nullable Collection<? extends Callable<R>> jobs, + @Nullable Collection<ClusterNode> nodes, + boolean sys, + long timeout + ) { assert mode != null; + assert timeout >= 0 : timeout; busyLock.readLock(); @@ -568,6 +584,9 @@ public class GridClosureProcessor extends GridProcessorAdapter { ctx.task().setThreadContext(TC_NO_FAILOVER, true); ctx.task().setThreadContext(TC_SUBGRID, nodes); + if (timeout > 0) + ctx.task().setThreadContext(TC_TIMEOUT, timeout); + return ctx.task().execute(new T6<>(mode, jobs), null, sys); } finally { @@ -580,6 +599,7 @@ public class GridClosureProcessor extends GridProcessorAdapter { * @param job Closure to execute. * @param nodes Grid nodes. * @param sys If {@code true}, then system pool will be used. + * @param timeout If greater than 0 limits task execution. Cannot be negative. * @param <R> Type. * @return Grid future for collection of closure results. */ @@ -2304,4 +2324,4 @@ public class GridClosureProcessor extends GridProcessorAdapter { return S.toString(C4MLAV2.class, this, super.toString()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index f5c6a27..9bee849 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -43,14 +43,17 @@ import org.apache.ignite.internal.GridJobExecuteResponse; import org.apache.ignite.internal.GridJobSessionImpl; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.service.GridServiceNotFoundException; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; @@ -587,6 +590,10 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { else U.warn(log, msg); } + else if (X.hasCause(e, GridServiceNotFoundException.class) || + X.hasCause(e, ClusterTopologyCheckedException.class)) + // Should be throttled, because GridServiceProxy continuously retry getting service. + LT.error(log, e, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']'); else U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 962a4c0..c266986 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -340,7 +340,7 @@ public class PlatformServices extends PlatformAbstractTarget { Object proxy = PlatformService.class.isAssignableFrom(d.serviceClass()) ? services.serviceProxy(name, PlatformService.class, sticky) - : new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, + : new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, 0, platformCtx.kernalContext()); return new ServiceProxyHolder(proxy, d.serviceClass()); http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index b9b92b8..3690f35 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -694,9 +694,10 @@ public class GridServiceProcessor extends GridProcessorAdapter { /** * @param name Service name. + * @param timeout If greater than 0 limits task execution time. Cannot be negative. * @return Service topology. */ - public Map<UUID, Integer> serviceTopology(String name) throws IgniteCheckedException { + public Map<UUID, Integer> serviceTopology(String name, long timeout) throws IgniteCheckedException { ClusterNode node = cache.affinity().mapKeyToNode(name); if (node.version().compareTo(ServiceTopologyCallable.SINCE_VER) >= 0) { @@ -708,7 +709,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { GridClosureCallMode.BROADCAST, call, Collections.singletonList(node), - false + false, + timeout ).get(); } else @@ -828,12 +830,13 @@ public class GridServiceProcessor extends GridProcessorAdapter { * @param name Service name. * @param svcItf Service class. * @param sticky Whether multi-node request should be done. + * @param timeout If greater than 0 limits service acquire time. Cannot be negative. * @param <T> Service interface type. * @return The proxy of a service by its name and class. * @throws IgniteException If failed to create proxy. */ @SuppressWarnings("unchecked") - public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> svcItf, boolean sticky) + public <T> T serviceProxy(ClusterGroup prj, String name, Class<? super T> svcItf, boolean sticky, long timeout) throws IgniteException { if (hasLocalNode(prj)) { ServiceContextImpl ctx = serviceContext(name); @@ -851,7 +854,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } - return new GridServiceProxy<T>(prj, name, svcItf, sticky, ctx).proxy(); + return new GridServiceProxy<T>(prj, name, svcItf, sticky, timeout, ctx).proxy(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index 564a13a..aa60934 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -84,11 +84,15 @@ public class GridServiceProxy<T> implements Serializable { /** Whether multi-node request should be done. */ private final boolean sticky; + /** Service availability wait timeout. */ + private final long waitTimeout; + /** * @param prj Grid projection. * @param name Service name. * @param svc Service type class. * @param sticky Whether multi-node request should be done. + * @param timeout Service availability wait timeout. Cannot be negative. * @param ctx Context. */ @SuppressWarnings("unchecked") @@ -96,12 +100,16 @@ public class GridServiceProxy<T> implements Serializable { String name, Class<? super T> svc, boolean sticky, + long timeout, GridKernalContext ctx) { + assert timeout >= 0 : timeout; + this.prj = prj; this.ctx = ctx; this.name = name; this.sticky = sticky; + this.waitTimeout = timeout; hasLocNode = hasLocalNode(prj); log = ctx.log(getClass()); @@ -145,6 +153,8 @@ public class GridServiceProxy<T> implements Serializable { ctx.gateway().readLock(); try { + final long startTime = U.currentTimeMillis(); + while (true) { ClusterNode node = null; @@ -171,7 +181,8 @@ public class GridServiceProxy<T> implements Serializable { GridClosureCallMode.BROADCAST, new ServiceProxyCallable(mtd.getName(), name, mtd.getParameterTypes(), args), Collections.singleton(node), - false + false, + waitTimeout ).get(); } } @@ -203,6 +214,9 @@ public class GridServiceProxy<T> implements Serializable { throw new IgniteException(e); } + + if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= waitTimeout) + throw new IgniteException("Service acquire timeout was reached, stopping. [timeout=" + waitTimeout + "]"); } } finally { @@ -246,7 +260,7 @@ public class GridServiceProxy<T> implements Serializable { if (hasLocNode && ctx.service().service(name) != null) return ctx.discovery().localNode(); - Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name); + Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, waitTimeout); if (snapshot == null || snapshot.isEmpty()) return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 3478c70..d89e80b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.closure.AffinityTask; +import org.apache.ignite.internal.processors.service.GridServiceNotFoundException; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; @@ -1065,6 +1066,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { return null; } + else if (X.hasCause(e, GridServiceNotFoundException.class) || + X.hasCause(e, ClusterTopologyCheckedException.class)) { + // Should be throttled, because GridServiceProxy continuously retry getting service. + LT.error(log, e, "Failed to obtain remote job result policy for result from " + + "ComputeTask.result(..) method (will fail the whole task): " + jobRes); + } else U.error(log, "Failed to obtain remote job result policy for result from " + "ComputeTask.result(..) method (will fail the whole task): " + jobRes, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java index c74daca..3ed91e8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteComputeTopologyExceptionTest.java @@ -88,7 +88,8 @@ public class IgniteComputeTopologyExceptionTest extends GridCommonAbstractTest { } }, nodes, - false); + false, + 0); try { fut.get(); @@ -99,4 +100,4 @@ public class IgniteComputeTopologyExceptionTest extends GridCommonAbstractTest { log.info("Expected exception: " + e); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java new file mode 100644 index 0000000..41eef31 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/IgniteServiceProxyTimeoutInitializedTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.service; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryBasicIdMapper; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeTaskTimeoutException; +import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.services.Service; +import org.apache.ignite.services.ServiceConfiguration; +import org.apache.ignite.services.ServiceContext; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests service proxy timeouts. + */ +public class IgniteServiceProxyTimeoutInitializedTest extends GridCommonAbstractTest { + /** */ + private static Service srvc; + + /** */ + private static CountDownLatch latch1; + + /** */ + private static CountDownLatch latch2; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(final String gridName) throws Exception { + final IgniteConfiguration cfg = super.getConfiguration(gridName); + + final ServiceConfiguration scfg = new ServiceConfiguration(); + + if (gridName.endsWith("0")) { + scfg.setName("testService"); + scfg.setService(srvc); + scfg.setMaxPerNodeCount(1); + scfg.setTotalCount(1); + scfg.setNodeFilter(new NodeFilter()); + + final Map<String, String> attrs = new HashMap<>(); + + attrs.put("clusterGroup", "0"); + + cfg.setUserAttributes(attrs); + + cfg.setServiceConfiguration(scfg); + } + + cfg.setMarshaller(null); + + final BinaryConfiguration binCfg = new BinaryConfiguration(); + + // Despite defaults explicitly set to lower case. + binCfg.setIdMapper(new BinaryBasicIdMapper(true)); + + cfg.setBinaryConfiguration(binCfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Checks that we limit retries to get not available service by timeout. + * + * @throws Exception If fail. + */ + @SuppressWarnings({"Convert2Lambda", "ThrowableResultOfMethodCallIgnored"}) + public void testUnavailableService() throws Exception { + srvc = new TestWaitServiceImpl(); + + latch1 = new CountDownLatch(1); + latch2 = new CountDownLatch(1); + + try { + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + startGrid(0); + + return null; + } + }); + + assert latch1.await(1, TimeUnit.MINUTES); + + final IgniteEx ignite1 = startGrid(1); + + final TestService testSrvc = ignite1.services().serviceProxy("testService", TestService.class, false, 500); + + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + testSrvc.test(); + + return null; + } + }, IgniteException.class, null); + } + finally { + latch2.countDown(); + } + } + + /** + * Checks that service not hangs if timeout set. Here we get hang with marshalling exception. + * + * @throws Exception If fail. + */ + @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "Convert2Lambda"}) + public void testServiceException() throws Exception { + srvc = new HangServiceImpl(); + + // Start service grid. + startGrid(0); + final IgniteEx ignite1 = startGrid(1); + + final HangService testSrvc = ignite1.services().serviceProxy("testService", HangService.class, false, 1_000); + + GridTestUtils.assertThrows(null, new Callable<Object>() { + @Override public Object call() throws Exception { + testSrvc.hang(); + + return null; + } + }, ComputeTaskTimeoutException.class, null); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60_000; + } + + /** + * + */ + private static class NodeFilter implements IgnitePredicate<ClusterNode> { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(final ClusterNode clusterNode) { + return "0".equals(clusterNode.attribute("clusterGroup")); + } + } + + /** + * + */ + private interface TestService { + /** */ + void test(); + } + + /** + * + */ + private static class TestWaitServiceImpl implements Service, TestService { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void test() { + // No-op + } + + /** {@inheritDoc} */ + @Override public void cancel(final ServiceContext ctx) { + // No-op + } + + /** {@inheritDoc} */ + @Override public void init(final ServiceContext ctx) throws Exception { + latch1.countDown(); + + // Simulate long initialization. + latch2.await(1, TimeUnit.MINUTES); + } + + /** {@inheritDoc} */ + @Override public void execute(final ServiceContext ctx) throws Exception { + // No-op + } + } + + /** + * + */ + private static class HangClass implements Binarylizable { + + /** {@inheritDoc} */ + @Override public void writeBinary(final BinaryWriter writer) throws BinaryObjectException { + try { + U.sleep(10_000); + } + catch (IgniteInterruptedCheckedException e) { + throw new BinaryObjectException(e); + } + } + + /** {@inheritDoc} */ + @Override public void readBinary(final BinaryReader reader) throws BinaryObjectException { + try { + U.sleep(10_000); + } + catch (IgniteInterruptedCheckedException e) { + throw new BinaryObjectException(e); + } + } + } + + /** + * + */ + private interface HangService { + /** + * @return Hangs deserialization. + */ + HangClass hang(); + } + + /** + * + */ + private static class HangServiceImpl implements HangService, Service { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public HangClass hang() { + return new HangClass(); + } + + /** {@inheritDoc} */ + @Override public void cancel(final ServiceContext ctx) { + + } + + /** {@inheritDoc} */ + @Override public void init(final ServiceContext ctx) throws Exception { + + } + + /** {@inheritDoc} */ + @Override public void execute(final ServiceContext ctx) throws Exception { + + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb2f2a3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java index d9cc8c0..350b715 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.service.IgniteServiceDeployment2Cla import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingDefaultMarshallerTest; import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingJdkMarshallerTest; import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest; +import org.apache.ignite.internal.processors.service.IgniteServiceProxyTimeoutInitializedTest; import org.apache.ignite.internal.processors.service.IgniteServiceReassignmentTest; import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest; import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest; @@ -141,6 +142,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite { suite.addTestSuite(GridServiceProxyNodeStopSelfTest.class); suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class); suite.addTestSuite(IgniteServiceReassignmentTest.class); + suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class); suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class); suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);
