http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java deleted file mode 100644 index 349b2bf..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessor.java +++ /dev/null @@ -1,1744 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.closure; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.worker.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.compute.ComputeJobResultPolicy.*; -import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; - -/** - * - */ -public class GridClosureProcessor extends GridProcessorAdapter { - /** */ - private final Executor sysPool; - - /** */ - private final Executor pubPool; - - /** */ - private final Executor ggfsPool; - - /** Lock to control execution after stop. */ - private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); - - /** Workers count. */ - private final LongAdder workersCnt = new LongAdder(); - - /** - * @param ctx Kernal context. - */ - public GridClosureProcessor(GridKernalContext ctx) { - super(ctx); - - sysPool = ctx.config().getSystemExecutorService(); - pubPool = ctx.config().getExecutorService(); - ggfsPool = ctx.config().getGgfsExecutorService(); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - if (log.isDebugEnabled()) - log.debug("Started closure processor."); - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override public void onKernalStop(boolean cancel) { - busyLock.writeLock(); - - boolean interrupted = Thread.interrupted(); - - while (workersCnt.sum() != 0) { - try { - Thread.sleep(200); - } - catch (InterruptedException ignored) { - interrupted = true; - } - } - - if (interrupted) - Thread.currentThread().interrupt(); - - if (log.isDebugEnabled()) - log.debug("Stopped closure processor."); - } - - /** - * @throws IllegalStateException If grid is stopped. - */ - private void enterBusy() throws IllegalStateException { - if (!busyLock.tryReadLock()) - throw new IllegalStateException("Closure processor cannot be used on stopped grid: " + ctx.gridName()); - } - - /** - * Unlocks busy lock. - */ - private void leaveBusy() { - busyLock.readUnlock(); - } - - /** - * @param mode Distribution mode. - * @param jobs Closures to execute. - * @param nodes Grid nodes. - * @return Task execution future. - */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, - @Nullable Collection<ClusterNode> nodes) { - return runAsync(mode, jobs, nodes, false); - } - - /** - * @param mode Distribution mode. - * @param jobs Closures to execute. - * @param nodes Grid nodes. - * @param sys If {@code true}, then system pool will be used. - * @return Task execution future. - */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, - @Nullable Collection<ClusterNode> nodes, boolean sys) { - assert mode != null; - - enterBusy(); - - try { - if (F.isEmpty(jobs)) - return new GridFinishedFuture(ctx); - - if (F.isEmpty(nodes)) - return new GridFinishedFuture(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T1(mode, jobs), null, sys); - } - finally { - leaveBusy(); - } - } - - /** - * @param mode Distribution mode. - * @param job Closure to execute. - * @param nodes Grid nodes. - * @return Task execution future. - */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, - @Nullable Collection<ClusterNode> nodes) { - return runAsync(mode, job, nodes, false); - } - - /** - * @param mode Distribution mode. - * @param job Closure to execute. - * @param nodes Grid nodes. - * @param sys If {@code true}, then system pool will be used. - * @return Task execution future. - */ - public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, - @Nullable Collection<ClusterNode> nodes, boolean sys) { - assert mode != null; - - enterBusy(); - - try { - if (job == null) - return new GridFinishedFuture(ctx); - - if (F.isEmpty(nodes)) - return new GridFinishedFuture(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T2(mode, job), null, sys); - } - finally { - leaveBusy(); - } - } - - /** - * Maps {@link Runnable} jobs to specified nodes based on distribution mode. - * - * @param mode Distribution mode. - * @param jobs Closures to map. - * @param nodes Grid nodes. - * @param lb Load balancer. - * @throws IgniteCheckedException Thrown in case of any errors. - * @return Mapping. - */ - private Map<ComputeJob, ClusterNode> absMap(GridClosureCallMode mode, Collection<? extends Runnable> jobs, - Collection<ClusterNode> nodes, ComputeLoadBalancer lb) throws IgniteCheckedException { - assert mode != null; - assert jobs != null; - assert nodes != null; - assert lb != null; - - if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) { - Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1); - - JobMapper mapper = new JobMapper(map); - - switch (mode) { - case BROADCAST: { - for (ClusterNode n : nodes) - for (Runnable r : jobs) - mapper.map(job(r), n); - - break; - } - - case BALANCE: { - for (Runnable r : jobs) { - ComputeJob job = job(r); - - mapper.map(job, lb.getBalancedNode(job, null)); - } - - break; - } - } - - return map; - } - else - return Collections.emptyMap(); - } - - /** - * Maps {@link Callable} jobs to specified nodes based on distribution mode. - * - * @param mode Distribution mode. - * @param jobs Closures to map. - * @param nodes Grid nodes. - * @param lb Load balancer. - * @throws IgniteCheckedException Thrown in case of any errors. - * @return Mapping. - */ - private <R> Map<ComputeJob, ClusterNode> outMap(GridClosureCallMode mode, - Collection<? extends Callable<R>> jobs, Collection<ClusterNode> nodes, ComputeLoadBalancer lb) - throws IgniteCheckedException { - assert mode != null; - assert jobs != null; - assert nodes != null; - assert lb != null; - - if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) { - Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1); - - JobMapper mapper = new JobMapper(map); - - switch (mode) { - case BROADCAST: { - for (ClusterNode n : nodes) - for (Callable<R> c : jobs) - mapper.map(job(c), n); - - break; - } - - case BALANCE: { - for (Callable<R> c : jobs) { - ComputeJob job = job(c); - - mapper.map(job, lb.getBalancedNode(job, null)); - } - - break; - } - } - - return map; - } - else - return Collections.emptyMap(); - } - - /** - * @param mode Distribution mode. - * @param jobs Closures to execute. - * @param rdc Reducer. - * @param nodes Grid nodes. - * @param <R1> Type. - * @param <R2> Type. - * @return Reduced result. - */ - public <R1, R2> IgniteFuture<R2> forkjoinAsync(GridClosureCallMode mode, - @Nullable Collection<? extends Callable<R1>> jobs, - @Nullable IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { - assert mode != null; - - enterBusy(); - - try { - if (F.isEmpty(jobs) || rdc == null) - return new GridFinishedFuture<>(ctx); - - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T3<>(mode, jobs, rdc), null); - } - finally { - leaveBusy(); - } - } - - /** - * @param mode Distribution mode. - * @param jobs Closures to execute. - * @param nodes Grid nodes. - * @param <R> Type. - * @return Grid future for collection of closure results. - */ - public <R> IgniteFuture<Collection<R>> callAsync( - GridClosureCallMode mode, - @Nullable Collection<? extends Callable<R>> jobs, - @Nullable Collection<ClusterNode> nodes) { - return callAsync(mode, jobs, nodes, false); - } - - /** - * @param mode Distribution mode. - * @param jobs Closures to execute. - * @param nodes Grid nodes. - * @param sys If {@code true}, then system pool will be used. - * @param <R> Type. - * @return Grid future for collection of closure results. - */ - public <R> IgniteFuture<Collection<R>> callAsync(GridClosureCallMode mode, - @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, - boolean sys) { - assert mode != null; - - enterBusy(); - - try { - if (F.isEmpty(jobs)) - return new GridFinishedFuture<>(ctx); - - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T6<>(mode, jobs), null, sys); - } - finally { - leaveBusy(); - } - } - - /** - * - * @param mode Distribution mode. - * @param job Closure to execute. - * @param nodes Grid nodes. - * @param <R> Type. - * @return Grid future for collection of closure results. - */ - public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode, - @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) { - return callAsync(mode, job, nodes, false); - } - - /** - * @param cacheName Cache name. - * @param affKey Affinity key. - * @param job Job. - * @param nodes Grid nodes. - * @return Job future. - */ - public <R> IgniteFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, - @Nullable Collection<ClusterNode> nodes) { - enterBusy(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - // In case cache key is passed instead of affinity key. - final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, false); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); - } - finally { - leaveBusy(); - } - } - - /** - * @param cacheName Cache name. - * @param affKey Affinity key. - * @param job Job. - * @param nodes Grid nodes. - * @return Job future. - */ - public IgniteFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, - @Nullable Collection<ClusterNode> nodes) { - enterBusy(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - // In case cache key is passed instead of affinity key. - final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T4(cacheName, affKey0, job), null, false); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(ctx, e); - } - finally { - leaveBusy(); - } - } - - /** - * @param mode Distribution mode. - * @param job Closure to execute. - * @param nodes Grid nodes. - * @param sys If {@code true}, then system pool will be used. - * @param <R> Type. - * @return Grid future for collection of closure results. - */ - public <R> IgniteFuture<R> callAsyncNoFailover(GridClosureCallMode mode, @Nullable Callable<R> job, - @Nullable Collection<ClusterNode> nodes, boolean sys) { - assert mode != null; - - enterBusy(); - - try { - if (job == null) - return new GridFinishedFuture<>(ctx); - - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_NO_FAILOVER, true); - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T7<>(mode, job), null, sys); - } - finally { - leaveBusy(); - } - } - - /** - * @param mode Distribution mode. - * @param jobs Closures to execute. - * @param nodes Grid nodes. - * @param sys If {@code true}, then system pool will be used. - * @param <R> Type. - * @return Grid future for collection of closure results. - */ - public <R> IgniteFuture<Collection<R>> callAsyncNoFailover(GridClosureCallMode mode, - @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, - boolean sys) { - assert mode != null; - - enterBusy(); - - try { - if (F.isEmpty(jobs)) - return new GridFinishedFuture<>(ctx); - - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_NO_FAILOVER, true); - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T6<>(mode, jobs), null, sys); - } - finally { - leaveBusy(); - } - } - - /** - * @param mode Distribution mode. - * @param job Closure to execute. - * @param nodes Grid nodes. - * @param sys If {@code true}, then system pool will be used. - * @param <R> Type. - * @return Grid future for collection of closure results. - */ - public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode, - @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, boolean sys) { - assert mode != null; - - enterBusy(); - - try { - if (job == null) - return new GridFinishedFuture<>(ctx); - - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T7<>(mode, job), null, sys); - } - finally { - leaveBusy(); - } - } - - /** - * @param job Job closure. - * @param arg Optional job argument. - * @param nodes Grid nodes. - * @return Grid future for execution result. - */ - public <T, R> IgniteFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, - @Nullable Collection<ClusterNode> nodes) { - enterBusy(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T8<>(job, arg), null, false); - } - finally { - leaveBusy(); - } - } - - /** - * @param job Job closure. - * @param arg Optional job argument. - * @param nodes Grid nodes. - * @return Grid future for execution result. - */ - public <T, R> IgniteFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg, - @Nullable Collection<ClusterNode> nodes) { - enterBusy(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T11<>(job, arg, nodes), null, false); - } - finally { - leaveBusy(); - } - } - - /** - * @param job Job closure. - * @param arg Optional job argument. - * @param nodes Grid nodes. - * @return Grid future for execution result. - */ - public <T, R> IgniteFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg, - @Nullable Collection<ClusterNode> nodes) { - enterBusy(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - ctx.task().setThreadContext(TC_NO_FAILOVER, true); - - return ctx.task().execute(new T11<>(job, arg, nodes), null, false); - } - finally { - leaveBusy(); - } - } - - /** - * @param job Job closure. - * @param args Job arguments. - * @param nodes Grid nodes. - * @return Grid future for execution result. - */ - public <T, R> IgniteFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, @Nullable Collection<? extends T> args, - @Nullable Collection<ClusterNode> nodes) { - enterBusy(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T9<>(job, args), null, false); - } - finally { - leaveBusy(); - } - } - - /** - * @param job Job closure. - * @param args Job arguments. - * @param rdc Reducer. - * @param nodes Grid nodes. - * @return Grid future for execution result. - */ - public <T, R1, R2> IgniteFuture<R2> callAsync(IgniteClosure<T, R1> job, - Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { - enterBusy(); - - try { - if (F.isEmpty(nodes)) - return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); - - ctx.task().setThreadContext(TC_SUBGRID, nodes); - - return ctx.task().execute(new T10<>(job, args, rdc), null, false); - } - finally { - leaveBusy(); - } - } - - /** - * Gets pool by execution policy. - * - * @param plc Whether to get system or public pool. - * @return Requested worker pool. - */ - private Executor pool(GridClosurePolicy plc) { - switch (plc) { - case PUBLIC_POOL: - return pubPool; - - case SYSTEM_POOL: - return sysPool; - - case GGFS_POOL: - return ggfsPool; - - default: - throw new IllegalArgumentException("Invalid closure execution policy: " + plc); - } - } - - /** - * Gets pool name by execution policy. - * - * @param plc Policy to choose executor pool. - * @return Pool name. - */ - private String poolName(GridClosurePolicy plc) { - switch (plc) { - case PUBLIC_POOL: - return "public"; - - case SYSTEM_POOL: - return "system"; - - case GGFS_POOL: - return "ggfs"; - - default: - throw new IllegalArgumentException("Invalid closure execution policy: " + plc); - } - } - - /** - * @param c Closure to execute. - * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. - * @return Future. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - private IgniteFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException { - return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); - } - - /** - * @param c Closure to execute. - * @param plc Whether to run on system or public pool. - * @return Future. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - private IgniteFuture<?> runLocal(@Nullable final Runnable c, GridClosurePolicy plc) throws IgniteCheckedException { - if (c == null) - return new GridFinishedFuture(ctx); - - enterBusy(); - - try { - // Inject only if needed. - if (!(c instanceof GridPlainRunnable)) - ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), c.getClass(), c); - - final ClassLoader ldr = Thread.currentThread().getContextClassLoader(); - - final GridWorkerFuture fut = new GridWorkerFuture(ctx); - - workersCnt.increment(); - - GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { - @Override protected void body() { - try { - if (ldr != null) - U.wrapThreadLoader(ldr, c); - else - c.run(); - - fut.onDone(); - } - catch (Throwable e) { - if (e instanceof Error) - U.error(log, "Closure execution failed with error.", e); - - fut.onDone(U.cast(e)); - } - finally { - workersCnt.decrement(); - } - } - }; - - fut.setWorker(w); - - try { - pool(plc).execute(w); - } - catch (RejectedExecutionException e) { - U.error(log, "Failed to execute worker due to execution rejection " + - "(increase upper bound on " + poolName(plc) + " executor service).", e); - - w.run(); - } - - return fut; - } - finally { - leaveBusy(); - } - } - - /** - * Executes closure on system pool. Companion to {@link #runLocal(Runnable, boolean)} but - * in case of rejected execution re-runs the closure in the current thread (blocking). - * - * @param c Closure to execute. - * @return Future. - */ - public IgniteFuture<?> runLocalSafe(Runnable c) { - return runLocalSafe(c, true); - } - - /** - * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs - * the closure in the current thread (blocking). - * - * @param c Closure to execute. - * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. - * @return Future. - */ - public IgniteFuture<?> runLocalSafe(Runnable c, boolean sys) { - return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); - } - - /** - * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs - * the closure in the current thread (blocking). - * - * @param c Closure to execute. - * @param plc Policy to choose executor pool. - * @return Future. - */ - public IgniteFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) { - try { - return runLocal(c, plc); - } - catch (Throwable e) { - if (e instanceof Error) - U.error(log, "Closure execution failed with error.", e); - - // If execution was rejected - rerun locally. - if (e.getCause() instanceof RejectedExecutionException) { - U.warn(log, "Closure execution has been rejected (will execute in the same thread) [plc=" + plc + - ", closure=" + c + ']'); - - try { - c.run(); - - return new GridFinishedFuture(ctx); - } - catch (Throwable t) { - if (t instanceof Error) - U.error(log, "Closure execution failed with error.", t); - - return new GridFinishedFuture(ctx, U.cast(t)); - } - } - // If failed for other reasons - return error future. - else - return new GridFinishedFuture(ctx, U.cast(e)); - } - } - - /** - * @param c Closure to execute. - * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. - * @return Future. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException { - return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); - } - - /** - * @param c Closure to execute. - * @param plc Whether to run on system or public pool. - * @param <R> Type of closure return value. - * @return Future. - * @throws IgniteCheckedException Thrown in case of any errors. - */ - private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, GridClosurePolicy plc) throws IgniteCheckedException { - if (c == null) - return new GridFinishedFuture<>(ctx); - - enterBusy(); - - try { - // Inject only if needed. - if (!(c instanceof GridPlainCallable)) - ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), c.getClass(), c); - - final ClassLoader ldr = Thread.currentThread().getContextClassLoader(); - - final GridWorkerFuture<R> fut = new GridWorkerFuture<>(ctx); - - workersCnt.increment(); - - GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { - @Override protected void body() { - try { - if (ldr != null) - fut.onDone(U.wrapThreadLoader(ldr, c)); - else - fut.onDone(c.call()); - } - catch (Throwable e) { - if (e instanceof Error) - U.error(log, "Closure execution failed with error.", e); - - fut.onDone(U.cast(e)); - } - finally { - workersCnt.decrement(); - } - } - }; - - fut.setWorker(w); - - try { - pool(plc).execute(w); - } - catch (RejectedExecutionException e) { - U.error(log, "Failed to execute worker due to execution rejection " + - "(increase upper bound on " + poolName(plc) + " executor service).", e); - - w.run(); - } - - return fut; - } - finally { - leaveBusy(); - } - } - - /** - * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)} - * but in case of rejected execution re-runs the closure in the current thread (blocking). - * - * @param c Closure to execute. - * @return Future. - */ - public <R> IgniteFuture<R> callLocalSafe(Callable<R> c) { - return callLocalSafe(c, true); - } - - /** - * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)} - * but in case of rejected execution re-runs the closure in the current thread (blocking). - * - * @param c Closure to execute. - * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. - * @return Future. - */ - public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, boolean sys) { - return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); - } - - /** - * Companion to {@link #callLocal(Callable, boolean)} but in case of rejected execution re-runs - * the closure in the current thread (blocking). - * - * @param c Closure to execute. - * @param plc Policy to choose executor pool. - * @return Future. - */ - public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy plc) { - try { - return callLocal(c, plc); - } - catch (IgniteCheckedException e) { - // If execution was rejected - rerun locally. - if (e.getCause() instanceof RejectedExecutionException) { - U.warn(log, "Closure execution has been rejected (will execute in the same thread) [plc=" + plc + - ", closure=" + c + ']'); - - try { - return new GridFinishedFuture<>(ctx, c.call()); - } - // If failed again locally - return error future. - catch (Exception e2) { - return new GridFinishedFuture<>(ctx, U.cast(e2)); - } - } - // If failed for other reasons - return error future. - else - return new GridFinishedFuture<>(ctx, U.cast(e)); - } - } - - /** - * Converts given closure with arguments to grid job. - * @param job Job. - * @param arg Optional argument. - * @return Job. - */ - @SuppressWarnings("IfMayBeConditional") - private <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) { - A.notNull(job, "job"); - - if (job instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Nullable @Override public Object execute() { - return job.apply(arg); - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - @Nullable @Override public Object execute() { - return job.apply(arg); - } - }; - } - } - - /** - * Converts given closure to a grid job. - * - * @param c Closure to convert to grid job. - * @return Grid job made out of closure. - */ - @SuppressWarnings("IfMayBeConditional") - private ComputeJob job(final Callable<?> c) { - A.notNull(c, "job"); - - if (c instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - }; - } - } - - /** - * Converts given closure to a grid job. - * - * @param c Closure to convert to grid job. - * @param cacheName Cache name. - * @param affKey Affinity key. - * @return Grid job made out of closure. - */ - @SuppressWarnings(value = {"IfMayBeConditional", "UnusedDeclaration"}) - private ComputeJob job(final Callable<?> c, @Nullable final String cacheName, final Object affKey) { - A.notNull(c, "job"); - - if (c instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - /** */ - @GridCacheName - private final String cn = cacheName; - - /** */ - @GridCacheAffinityKeyMapped - private final Object ak = affKey; - - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - /** */ - @GridCacheName - private final String cn = cacheName; - - /** */ - @GridCacheAffinityKeyMapped - private final Object ak = affKey; - - @Override public Object execute() { - try { - return c.call(); - } - catch (Exception e) { - throw new IgniteException(e); - } - } - }; - } - } - - /** - * Converts given closure to a grid job. - * - * @param r Closure to convert to grid job. - * @return Grid job made out of closure. - */ - @SuppressWarnings("IfMayBeConditional") - private static ComputeJob job(final Runnable r) { - A.notNull(r, "job"); - - if (r instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - }; - } - } - - /** - * Converts given closure to a grid job. - * - * @param r Closure to convert to grid job. - * @param cacheName Cache name. - * @param affKey Affinity key. - * @return Grid job made out of closure. - */ - @SuppressWarnings(value = {"IfMayBeConditional", "UnusedDeclaration"}) - private ComputeJob job(final Runnable r, @Nullable final String cacheName, final Object affKey) { - A.notNull(r, "job"); - - if (r instanceof ComputeJobMasterLeaveAware) { - return new GridMasterLeaveAwareComputeJobAdapter() { - /** */ - @GridCacheName - private final String cn = cacheName; - - /** */ - @GridCacheAffinityKeyMapped - private final Object ak = affKey; - - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - - @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { - ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); - } - }; - } - else { - return new ComputeJobAdapter() { - /** */ - @GridCacheName - private final String cn = cacheName; - - /** */ - @GridCacheAffinityKeyMapped - private final Object ak = affKey; - - @Nullable @Override public Object execute() { - r.run(); - - return null; - } - }; - } - } - - /** */ - private class JobMapper { - /** */ - private final Map<ComputeJob, ClusterNode> map; - - /** */ - private boolean hadLocNode; - - /** - * @param map Jobs map. - */ - private JobMapper(Map<ComputeJob, ClusterNode> map) { - assert map != null; - assert map.isEmpty(); - - this.map = map; - } - - /** - * @param job Job. - * @param node Node. - * @throws IgniteCheckedException In case of error. - */ - public void map(ComputeJob job, ClusterNode node) throws IgniteCheckedException { - assert job != null; - assert node != null; - - if (ctx.localNodeId().equals(node.id())) { - if (hadLocNode) { - IgniteMarshaller marsh = ctx.config().getMarshaller(); - - job = marsh.unmarshal(marsh.marshal(job), null); - } - else - hadLocNode = true; - } - - map.put(job, node); - } - } - - /** - * No-reduce task adapter. - */ - private abstract static class TaskNoReduceAdapter<T> extends GridPeerDeployAwareTaskAdapter<T, Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** - * @param pda Peer deploy aware instance. - */ - protected TaskNoReduceAdapter(@Nullable GridPeerDeployAware pda) { - super(pda); - } - - /** {@inheritDoc} */ - @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } - } - - /** - * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection)}. - */ - private class T1 extends TaskNoReduceAdapter<Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** */ - private IgniteBiTuple<GridClosureCallMode, Collection<? extends Runnable>> t; - - /** - * @param mode Call mode. - * @param jobs Collection of jobs. - */ - private T1(GridClosureCallMode mode, Collection<? extends Runnable> jobs) { - super(U.peerDeployAware0(jobs)); - - t = F.< - GridClosureCallMode, - Collection<? extends Runnable> - >t(mode, jobs); - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - return absMap(t.get1(), t.get2(), subgrid, lb); - } - } - - /** - * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection)}. - */ - private class T2 extends TaskNoReduceAdapter<Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** */ - private IgniteBiTuple<GridClosureCallMode, Runnable> t; - - /** - * @param mode Call mode. - * @param job Job. - */ - private T2(GridClosureCallMode mode, Runnable job) { - super(U.peerDeployAware(job)); - - t = F.t(mode, job); - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - return absMap(t.get1(), F.asList(t.get2()), subgrid, lb); - } - } - - /** - * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection)} - */ - private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** */ - private GridTuple3<GridClosureCallMode, - Collection<? extends Callable<R1>>, - IgniteReducer<R1, R2> - > t; - - /** - * - * @param mode Call mode. - * @param jobs Collection of jobs. - * @param rdc Reducer. - */ - private T3(GridClosureCallMode mode, Collection<? extends Callable<R1>> jobs, IgniteReducer<R1, R2> rdc) { - super(U.peerDeployAware0(jobs)); - - t = F.< - GridClosureCallMode, - Collection<? extends Callable<R1>>, - IgniteReducer<R1, R2> - >t(mode, jobs, rdc); - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - return outMap(t.get1(), t.get2(), subgrid, lb); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) - throws IgniteCheckedException { - ComputeJobResultPolicy resPlc = super.result(res, rcvd); - - if (res.getException() == null && resPlc != FAILOVER && !t.get3().collect((R1)res.getData())) - resPlc = REDUCE; // If reducer returned false - reduce right away. - - return resPlc; - } - - /** {@inheritDoc} */ - @Override public R2 reduce(List<ComputeJobResult> res) { - return t.get3().reduce(); - } - } - - /** - */ - private class T4 extends TaskNoReduceAdapter<Void> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final String cacheName; - - /** */ - private Object affKey; - - /** */ - private Runnable job; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** - * @param cacheName Cache name. - * @param affKey Affinity key. - * @param job Job. - */ - private T4(@Nullable String cacheName, Object affKey, Runnable job) { - super(U.peerDeployAware0(job)); - - this.cacheName = cacheName; - this.affKey = affKey; - this.job = job; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - ComputeJob job = job(this.job, cacheName, affKey); - - return Collections.singletonMap(job, lb.getBalancedNode(job, null)); - } - } - - /** - */ - private class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final String cacheName; - - /** */ - private Object affKey; - - /** */ - private Callable<R> job; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** - * @param cacheName Cache name. - * @param affKey Affinity key. - * @param job Job. - */ - private T5(@Nullable String cacheName, Object affKey, Callable<R> job) { - super(U.peerDeployAware0(job)); - - this.cacheName = cacheName; - this.affKey = affKey; - this.job = job; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - ComputeJob job = job(this.job, cacheName, affKey); - - return Collections.singletonMap(job, lb.getBalancedNode(job, null)); - } - - /** {@inheritDoc} */ - @Override public R reduce(List<ComputeJobResult> res) throws IgniteCheckedException { - for (ComputeJobResult r : res) { - if (r.getException() == null) - return r.getData(); - } - - throw new IgniteCheckedException("Failed to find successful job result: " + res); - } - } - - /** - * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection)} - */ - private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final GridClosureCallMode mode; - - /** */ - private final Collection<? extends Callable<R>> jobs; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** - * - * @param mode Call mode. - * @param jobs Collection of jobs. - */ - private T6( - GridClosureCallMode mode, - Collection<? extends Callable<R>> jobs) { - super(U.peerDeployAware0(jobs)); - - this.mode = mode; - this.jobs = jobs; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - return outMap(mode, jobs, subgrid, lb); - } - - /** {@inheritDoc} */ - @Override public Collection<R> reduce(List<ComputeJobResult> res) { - return F.jobResults(res); - } - } - - /** - * Task that is free of dragged in enclosing context for the method - * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection)} - */ - private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteBiTuple<GridClosureCallMode, Callable<R>> t; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** - * @param mode Call mode. - * @param job Job. - */ - private T7(GridClosureCallMode mode, Callable<R> job) { - super(U.peerDeployAware(job)); - - t = F.t(mode, job); - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - return outMap(t.get1(), F.asList(t.get2()), subgrid, lb); - } - - /** {@inheritDoc} */ - @Override public R reduce(List<ComputeJobResult> res) throws IgniteCheckedException { - for (ComputeJobResult r : res) - if (r.getException() == null) - return r.getData(); - - throw new IgniteCheckedException("Failed to find successful job result: " + res); - } - } - - /** - */ - private class T8<T, R> extends GridPeerDeployAwareTaskAdapter<Void, R> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteClosure<T, R> job; - - /** */ - private T arg; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** - * @param job Job. - * @param arg Optional job argument. - */ - private T8(IgniteClosure<T, R> job, @Nullable T arg) { - super(U.peerDeployAware(job)); - - this.job = job; - this.arg = arg; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - ComputeJob job = job(this.job, this.arg); - - return Collections.singletonMap(job, lb.getBalancedNode(job, null)); - } - - /** {@inheritDoc} */ - @Override public R reduce(List<ComputeJobResult> res) throws IgniteCheckedException { - for (ComputeJobResult r : res) - if (r.getException() == null) - return r.getData(); - - throw new IgniteCheckedException("Failed to find successful job result: " + res); - } - } - - /** - */ - private class T9<T, R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteClosure<T, R> job; - - /** */ - private Collection<? extends T> args; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** - * @param job Job. - * @param args Job arguments. - */ - private T9(IgniteClosure<T, R> job, Collection<? extends T> args) { - super(U.peerDeployAware(job)); - - this.job = job; - this.args = args; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - Map<ComputeJob, ClusterNode> map = new HashMap<>(args.size(), 1); - - JobMapper mapper = new JobMapper(map); - - for (T jobArg : args) { - ComputeJob job = job(this.job, jobArg); - - mapper.map(job, lb.getBalancedNode(job, null)); - } - - return map; - } - - /** {@inheritDoc} */ - @Override public Collection<R> reduce(List<ComputeJobResult> res) throws IgniteCheckedException { - return F.jobResults(res); - } - } - - /** - */ - private class T10<T, R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private IgniteClosure<T, R1> job; - - /** */ - private Collection<? extends T> args; - - /** */ - private IgniteReducer<R1, R2> rdc; - - /** */ - @IgniteLoadBalancerResource - private ComputeLoadBalancer lb; - - /** - * @param job Job. - * @param args Job arguments. - * @param rdc Reducer. - */ - private T10(IgniteClosure<T, R1> job, Collection<? extends T> args, IgniteReducer<R1, R2> rdc) { - super(U.peerDeployAware(job)); - - this.job = job; - this.args = args; - this.rdc = rdc; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - Map<ComputeJob, ClusterNode> map = new HashMap<>(args.size(), 1); - - JobMapper mapper = new JobMapper(map); - - for (T jobArg : args) { - ComputeJob job = job(this.job, jobArg); - - mapper.map(job, lb.getBalancedNode(job, null)); - } - - return map; - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) - throws IgniteCheckedException { - ComputeJobResultPolicy resPlc = super.result(res, rcvd); - - if (res.getException() == null && resPlc != FAILOVER && !rdc.collect((R1) res.getData())) - resPlc = REDUCE; // If reducer returned false - reduce right away. - - return resPlc; - } - - /** {@inheritDoc} */ - @Override public R2 reduce(List<ComputeJobResult> res) throws IgniteCheckedException { - return rdc.reduce(); - } - } - - /** - */ - private class T11<T, R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private final IgniteClosure<T, R> job; - - /** */ - private final T arg; - - /** - * @param job Job. - * @param arg Job argument. - * @param nodes Collection of nodes. - */ - private T11(IgniteClosure<T, R> job, @Nullable T arg, Collection<ClusterNode> nodes) { - super(U.peerDeployAware(job)); - - this.job = job; - this.arg = arg; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) - throws IgniteCheckedException { - if (F.isEmpty(subgrid)) - return Collections.emptyMap(); - - Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 1); - - JobMapper mapper = new JobMapper(map); - - for (ClusterNode n : subgrid) - mapper.map(job(job, this.arg), n); - - return map; - } - - /** {@inheritDoc} */ - @Override public Collection<R> reduce(List<ComputeJobResult> res) { - return F.jobResults(res); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java deleted file mode 100644 index d7cb935..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.closure; - -import org.apache.ignite.compute.*; - -/** - * Job adapter implementing {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware}. - */ -public abstract class GridMasterLeaveAwareComputeJobAdapter extends ComputeJobAdapter - implements ComputeJobMasterLeaveAware { - /** */ - private static final long serialVersionUID = 0L; - - /** - * No-arg constructor. - */ - protected GridMasterLeaveAwareComputeJobAdapter() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridPeerDeployAwareTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridPeerDeployAwareTaskAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridPeerDeployAwareTaskAdapter.java deleted file mode 100644 index 7a014f4..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/GridPeerDeployAwareTaskAdapter.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.closure; - -import org.apache.ignite.compute.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -/** - * Peer deployment aware task adapter. - */ -public abstract class GridPeerDeployAwareTaskAdapter<T, R> extends ComputeTaskAdapter<T, R> - implements GridPeerDeployAware { - /** */ - private static final long serialVersionUID = 0L; - - /** Peer deploy aware class. */ - private transient GridPeerDeployAware pda; - - /** - * Constructor that receives deployment information for task. - * - * @param pda Deployment information. - */ - protected GridPeerDeployAwareTaskAdapter(@Nullable GridPeerDeployAware pda) { - this.pda = pda; - } - - /** {@inheritDoc} */ - @Override public Class<?> deployClass() { - if (pda == null) - pda = U.detectPeerDeployAware(this); - - return pda.deployClass(); - } - - /** {@inheritDoc} */ - @Override public ClassLoader classLoader() { - if (pda == null) - pda = U.detectPeerDeployAware(this); - - return pda.classLoader(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/package.html b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/package.html deleted file mode 100644 index cbdb317..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/closure/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - TODO. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousHandler.java deleted file mode 100644 index 945bd90..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousHandler.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.continuous; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Continuous routine handler. - */ -public interface GridContinuousHandler extends Externalizable { - /** - * Registers listener. - * - * @param nodeId ID of the node that started routine. - * @param routineId Routine ID. - * @param ctx Kernal context. - * @return Whether listener was actually registered. - * @throws IgniteCheckedException In case of error. - */ - public boolean register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException; - - /** - * Callback called after listener is registered and acknowledgement is sent. - * - * @param routineId Routine ID. - * @param ctx Kernal context. - */ - public void onListenerRegistered(UUID routineId, GridKernalContext ctx); - - /** - * Unregisters listener. - * - * @param routineId Routine ID. - * @param ctx Kernal context. - */ - public void unregister(UUID routineId, GridKernalContext ctx); - - /** - * Notifies local callback. - * - * @param nodeId ID of the node where notification came from. - * @param routineId Routine ID. - * @param objs Notification objects. - * @param ctx Kernal context. - */ - public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx); - - /** - * Deploys and marshals inner objects (called only if peer deployment is enabled). - * - * @param ctx Kernal context. - * @throws IgniteCheckedException In case of error. - */ - public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException; - - /** - * Unmarshals inner objects (called only if peer deployment is enabled). - * - * @param nodeId Sender node ID. - * @param ctx Kernal context. - * @throws IgniteCheckedException In case of error. - */ - public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException; - - /** - * @return Topic for ordered notifications. If {@code null}, notifications - * will be sent in non-ordered messages. - */ - @Nullable public Object orderedTopic(); - - /** - * @return {@code True} if for events. - */ - public boolean isForEvents(); - - /** - * @return {@code True} if for messaging. - */ - public boolean isForMessaging(); - - /** - * @return {@code True} if for continuous queries. - */ - public boolean isForQuery(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java deleted file mode 100644 index f295d00..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessage.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.continuous; - -import org.apache.ignite.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.tostring.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.util.*; - -import static org.gridgain.grid.kernal.processors.continuous.GridContinuousMessageType.*; - -/** - * Continuous processor message. - */ -public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { - /** */ - private static final long serialVersionUID = 0L; - - /** Message type. */ - private GridContinuousMessageType type; - - /** Routine ID. */ - private UUID routineId; - - /** Optional message data. */ - @GridToStringInclude - @GridDirectTransient - private Object data; - - /** Serialized message data. */ - private byte[] dataBytes; - - /** Future ID for synchronous event notifications. */ - private IgniteUuid futId; - - /** - * Required by {@link Externalizable}. - */ - public GridContinuousMessage() { - // No-op. - } - - /** - * @param type Message type. - * @param routineId Consume ID. - * @param futId Future ID. - * @param data Optional message data. - */ - GridContinuousMessage(GridContinuousMessageType type, - @Nullable UUID routineId, - @Nullable IgniteUuid futId, - @Nullable Object data) { - assert type != null; - assert routineId != null || type == MSG_EVT_ACK; - - this.type = type; - this.routineId = routineId; - this.futId = futId; - this.data = data; - } - - /** - * @return Message type. - */ - public GridContinuousMessageType type() { - return type; - } - - /** - * @return Consume ID. - */ - public UUID routineId() { - return routineId; - } - - /** - * @return Message data. - */ - @SuppressWarnings("unchecked") - public <T> T data() { - return (T)data; - } - - /** - * @param data Message data. - */ - public void data(Object data) { - this.data = data; - } - - /** - * @return Serialized message data. - */ - public byte[] dataBytes() { - return dataBytes; - } - - /** - * @param dataBytes Serialized message data. - */ - public void dataBytes(byte[] dataBytes) { - this.dataBytes = dataBytes; - } - - /** - * @return Future ID for synchronous event notification. - */ - @Nullable public IgniteUuid futureId() { - return futId; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) - @Override public GridTcpCommunicationMessageAdapter clone() { - GridContinuousMessage _clone = new GridContinuousMessage(); - - clone0(_clone); - - return _clone; - } - - /** {@inheritDoc} */ - @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) { - GridContinuousMessage clone = (GridContinuousMessage)msg; - - clone.type = type; - clone.routineId = routineId; - clone.futId = futId; - clone.data = data; - clone.dataBytes = dataBytes; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); - - if (!commState.typeWritten) { - if (!commState.putByte(directType())) - return false; - - commState.typeWritten = true; - } - - switch (commState.idx) { - case 0: - if (!commState.putByteArray(dataBytes)) - return false; - - commState.idx++; - - case 1: - if (!commState.putGridUuid(futId)) - return false; - - commState.idx++; - - case 2: - if (!commState.putUuid(routineId)) - return false; - - commState.idx++; - - case 3: - if (!commState.putEnum(type)) - return false; - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @SuppressWarnings("fallthrough") - @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - - switch (commState.idx) { - case 0: - byte[] dataBytes0 = commState.getByteArray(); - - if (dataBytes0 == BYTE_ARR_NOT_READ) - return false; - - dataBytes = dataBytes0; - - commState.idx++; - - case 1: - IgniteUuid futId0 = commState.getGridUuid(); - - if (futId0 == GRID_UUID_NOT_READ) - return false; - - futId = futId0; - - commState.idx++; - - case 2: - UUID routineId0 = commState.getUuid(); - - if (routineId0 == UUID_NOT_READ) - return false; - - routineId = routineId0; - - commState.idx++; - - case 3: - if (buf.remaining() < 1) - return false; - - byte type0 = commState.getByte(); - - type = fromOrdinal(type0); - - commState.idx++; - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 60; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridContinuousMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java deleted file mode 100644 index e72c175..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/continuous/GridContinuousMessageType.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.continuous; - -import org.jetbrains.annotations.*; - -/** - * Continuous processor message types. - */ -enum GridContinuousMessageType { - /** Consume start request. */ - MSG_START_REQ, - - /** Consume start acknowledgement. */ - MSG_START_ACK, - - /** Consume stop request. */ - MSG_STOP_REQ, - - /** Consume stop acknowledgement. */ - MSG_STOP_ACK, - - /** Remote event notification. */ - MSG_EVT_NOTIFICATION, - - /** Event notification acknowledgement for synchronous events. */ - MSG_EVT_ACK; - - /** Enumerated values. */ - private static final GridContinuousMessageType[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static GridContinuousMessageType fromOrdinal(byte ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -}
