http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java new file mode 100644 index 0000000..308316d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java @@ -0,0 +1,1744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.worker.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.compute.ComputeJobResultPolicy.*; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; + +/** + * + */ +public class GridClosureProcessor extends GridProcessorAdapter { + /** */ + private final Executor sysPool; + + /** */ + private final Executor pubPool; + + /** */ + private final Executor ggfsPool; + + /** Lock to control execution after stop. */ + private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); + + /** Workers count. */ + private final LongAdder workersCnt = new LongAdder(); + + /** + * @param ctx Kernal context. + */ + public GridClosureProcessor(GridKernalContext ctx) { + super(ctx); + + sysPool = ctx.config().getSystemExecutorService(); + pubPool = ctx.config().getExecutorService(); + ggfsPool = ctx.config().getGgfsExecutorService(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Started closure processor."); + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void onKernalStop(boolean cancel) { + busyLock.writeLock(); + + boolean interrupted = Thread.interrupted(); + + while (workersCnt.sum() != 0) { + try { + Thread.sleep(200); + } + catch (InterruptedException ignored) { + interrupted = true; + } + } + + if (interrupted) + Thread.currentThread().interrupt(); + + if (log.isDebugEnabled()) + log.debug("Stopped closure processor."); + } + + /** + * @throws IllegalStateException If grid is stopped. + */ + private void enterBusy() throws IllegalStateException { + if (!busyLock.tryReadLock()) + throw new IllegalStateException("Closure processor cannot be used on stopped grid: " + ctx.gridName()); + } + + /** + * Unlocks busy lock. + */ + private void leaveBusy() { + busyLock.readUnlock(); + } + + /** + * @param mode Distribution mode. + * @param jobs Closures to execute. + * @param nodes Grid nodes. + * @return Task execution future. + */ + public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, + @Nullable Collection<ClusterNode> nodes) { + return runAsync(mode, jobs, nodes, false); + } + + /** + * @param mode Distribution mode. + * @param jobs Closures to execute. + * @param nodes Grid nodes. + * @param sys If {@code true}, then system pool will be used. + * @return Task execution future. + */ + public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs, + @Nullable Collection<ClusterNode> nodes, boolean sys) { + assert mode != null; + + enterBusy(); + + try { + if (F.isEmpty(jobs)) + return new GridFinishedFuture(ctx); + + if (F.isEmpty(nodes)) + return new GridFinishedFuture(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T1(mode, jobs), null, sys); + } + finally { + leaveBusy(); + } + } + + /** + * @param mode Distribution mode. + * @param job Closure to execute. + * @param nodes Grid nodes. + * @return Task execution future. + */ + public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, + @Nullable Collection<ClusterNode> nodes) { + return runAsync(mode, job, nodes, false); + } + + /** + * @param mode Distribution mode. + * @param job Closure to execute. + * @param nodes Grid nodes. + * @param sys If {@code true}, then system pool will be used. + * @return Task execution future. + */ + public IgniteFuture<?> runAsync(GridClosureCallMode mode, @Nullable Runnable job, + @Nullable Collection<ClusterNode> nodes, boolean sys) { + assert mode != null; + + enterBusy(); + + try { + if (job == null) + return new GridFinishedFuture(ctx); + + if (F.isEmpty(nodes)) + return new GridFinishedFuture(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T2(mode, job), null, sys); + } + finally { + leaveBusy(); + } + } + + /** + * Maps {@link Runnable} jobs to specified nodes based on distribution mode. + * + * @param mode Distribution mode. + * @param jobs Closures to map. + * @param nodes Grid nodes. + * @param lb Load balancer. + * @throws IgniteCheckedException Thrown in case of any errors. + * @return Mapping. + */ + private Map<ComputeJob, ClusterNode> absMap(GridClosureCallMode mode, Collection<? extends Runnable> jobs, + Collection<ClusterNode> nodes, ComputeLoadBalancer lb) throws IgniteCheckedException { + assert mode != null; + assert jobs != null; + assert nodes != null; + assert lb != null; + + if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) { + Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1); + + JobMapper mapper = new JobMapper(map); + + switch (mode) { + case BROADCAST: { + for (ClusterNode n : nodes) + for (Runnable r : jobs) + mapper.map(job(r), n); + + break; + } + + case BALANCE: { + for (Runnable r : jobs) { + ComputeJob job = job(r); + + mapper.map(job, lb.getBalancedNode(job, null)); + } + + break; + } + } + + return map; + } + else + return Collections.emptyMap(); + } + + /** + * Maps {@link Callable} jobs to specified nodes based on distribution mode. + * + * @param mode Distribution mode. + * @param jobs Closures to map. + * @param nodes Grid nodes. + * @param lb Load balancer. + * @throws IgniteCheckedException Thrown in case of any errors. + * @return Mapping. + */ + private <R> Map<ComputeJob, ClusterNode> outMap(GridClosureCallMode mode, + Collection<? extends Callable<R>> jobs, Collection<ClusterNode> nodes, ComputeLoadBalancer lb) + throws IgniteCheckedException { + assert mode != null; + assert jobs != null; + assert nodes != null; + assert lb != null; + + if (!F.isEmpty(jobs) && !F.isEmpty(nodes)) { + Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1); + + JobMapper mapper = new JobMapper(map); + + switch (mode) { + case BROADCAST: { + for (ClusterNode n : nodes) + for (Callable<R> c : jobs) + mapper.map(job(c), n); + + break; + } + + case BALANCE: { + for (Callable<R> c : jobs) { + ComputeJob job = job(c); + + mapper.map(job, lb.getBalancedNode(job, null)); + } + + break; + } + } + + return map; + } + else + return Collections.emptyMap(); + } + + /** + * @param mode Distribution mode. + * @param jobs Closures to execute. + * @param rdc Reducer. + * @param nodes Grid nodes. + * @param <R1> Type. + * @param <R2> Type. + * @return Reduced result. + */ + public <R1, R2> IgniteFuture<R2> forkjoinAsync(GridClosureCallMode mode, + @Nullable Collection<? extends Callable<R1>> jobs, + @Nullable IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { + assert mode != null; + + enterBusy(); + + try { + if (F.isEmpty(jobs) || rdc == null) + return new GridFinishedFuture<>(ctx); + + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T3<>(mode, jobs, rdc), null); + } + finally { + leaveBusy(); + } + } + + /** + * @param mode Distribution mode. + * @param jobs Closures to execute. + * @param nodes Grid nodes. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> IgniteFuture<Collection<R>> callAsync( + GridClosureCallMode mode, + @Nullable Collection<? extends Callable<R>> jobs, + @Nullable Collection<ClusterNode> nodes) { + return callAsync(mode, jobs, nodes, false); + } + + /** + * @param mode Distribution mode. + * @param jobs Closures to execute. + * @param nodes Grid nodes. + * @param sys If {@code true}, then system pool will be used. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> IgniteFuture<Collection<R>> callAsync(GridClosureCallMode mode, + @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, + boolean sys) { + assert mode != null; + + enterBusy(); + + try { + if (F.isEmpty(jobs)) + return new GridFinishedFuture<>(ctx); + + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T6<>(mode, jobs), null, sys); + } + finally { + leaveBusy(); + } + } + + /** + * + * @param mode Distribution mode. + * @param job Closure to execute. + * @param nodes Grid nodes. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode, + @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) { + return callAsync(mode, job, nodes, false); + } + + /** + * @param cacheName Cache name. + * @param affKey Affinity key. + * @param job Job. + * @param nodes Grid nodes. + * @return Job future. + */ + public <R> IgniteFuture<R> affinityCall(@Nullable String cacheName, Object affKey, Callable<R> job, + @Nullable Collection<ClusterNode> nodes) { + enterBusy(); + + try { + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + // In case cache key is passed instead of affinity key. + final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T5<>(cacheName, affKey0, job), null, false); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + finally { + leaveBusy(); + } + } + + /** + * @param cacheName Cache name. + * @param affKey Affinity key. + * @param job Job. + * @param nodes Grid nodes. + * @return Job future. + */ + public IgniteFuture<?> affinityRun(@Nullable String cacheName, Object affKey, Runnable job, + @Nullable Collection<ClusterNode> nodes) { + enterBusy(); + + try { + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + // In case cache key is passed instead of affinity key. + final Object affKey0 = ctx.affinity().affinityKey(cacheName, affKey); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T4(cacheName, affKey0, job), null, false); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(ctx, e); + } + finally { + leaveBusy(); + } + } + + /** + * @param mode Distribution mode. + * @param job Closure to execute. + * @param nodes Grid nodes. + * @param sys If {@code true}, then system pool will be used. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> IgniteFuture<R> callAsyncNoFailover(GridClosureCallMode mode, @Nullable Callable<R> job, + @Nullable Collection<ClusterNode> nodes, boolean sys) { + assert mode != null; + + enterBusy(); + + try { + if (job == null) + return new GridFinishedFuture<>(ctx); + + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_NO_FAILOVER, true); + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T7<>(mode, job), null, sys); + } + finally { + leaveBusy(); + } + } + + /** + * @param mode Distribution mode. + * @param jobs Closures to execute. + * @param nodes Grid nodes. + * @param sys If {@code true}, then system pool will be used. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> IgniteFuture<Collection<R>> callAsyncNoFailover(GridClosureCallMode mode, + @Nullable Collection<? extends Callable<R>> jobs, @Nullable Collection<ClusterNode> nodes, + boolean sys) { + assert mode != null; + + enterBusy(); + + try { + if (F.isEmpty(jobs)) + return new GridFinishedFuture<>(ctx); + + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_NO_FAILOVER, true); + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T6<>(mode, jobs), null, sys); + } + finally { + leaveBusy(); + } + } + + /** + * @param mode Distribution mode. + * @param job Closure to execute. + * @param nodes Grid nodes. + * @param sys If {@code true}, then system pool will be used. + * @param <R> Type. + * @return Grid future for collection of closure results. + */ + public <R> IgniteFuture<R> callAsync(GridClosureCallMode mode, + @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes, boolean sys) { + assert mode != null; + + enterBusy(); + + try { + if (job == null) + return new GridFinishedFuture<>(ctx); + + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T7<>(mode, job), null, sys); + } + finally { + leaveBusy(); + } + } + + /** + * @param job Job closure. + * @param arg Optional job argument. + * @param nodes Grid nodes. + * @return Grid future for execution result. + */ + public <T, R> IgniteFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg, + @Nullable Collection<ClusterNode> nodes) { + enterBusy(); + + try { + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T8<>(job, arg), null, false); + } + finally { + leaveBusy(); + } + } + + /** + * @param job Job closure. + * @param arg Optional job argument. + * @param nodes Grid nodes. + * @return Grid future for execution result. + */ + public <T, R> IgniteFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg, + @Nullable Collection<ClusterNode> nodes) { + enterBusy(); + + try { + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T11<>(job, arg, nodes), null, false); + } + finally { + leaveBusy(); + } + } + + /** + * @param job Job closure. + * @param arg Optional job argument. + * @param nodes Grid nodes. + * @return Grid future for execution result. + */ + public <T, R> IgniteFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg, + @Nullable Collection<ClusterNode> nodes) { + enterBusy(); + + try { + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + ctx.task().setThreadContext(TC_NO_FAILOVER, true); + + return ctx.task().execute(new T11<>(job, arg, nodes), null, false); + } + finally { + leaveBusy(); + } + } + + /** + * @param job Job closure. + * @param args Job arguments. + * @param nodes Grid nodes. + * @return Grid future for execution result. + */ + public <T, R> IgniteFuture<Collection<R>> callAsync(IgniteClosure<T, R> job, @Nullable Collection<? extends T> args, + @Nullable Collection<ClusterNode> nodes) { + enterBusy(); + + try { + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T9<>(job, args), null, false); + } + finally { + leaveBusy(); + } + } + + /** + * @param job Job closure. + * @param args Job arguments. + * @param rdc Reducer. + * @param nodes Grid nodes. + * @return Grid future for execution result. + */ + public <T, R1, R2> IgniteFuture<R2> callAsync(IgniteClosure<T, R1> job, + Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) { + enterBusy(); + + try { + if (F.isEmpty(nodes)) + return new GridFinishedFuture<>(ctx, U.emptyTopologyException()); + + ctx.task().setThreadContext(TC_SUBGRID, nodes); + + return ctx.task().execute(new T10<>(job, args, rdc), null, false); + } + finally { + leaveBusy(); + } + } + + /** + * Gets pool by execution policy. + * + * @param plc Whether to get system or public pool. + * @return Requested worker pool. + */ + private Executor pool(GridClosurePolicy plc) { + switch (plc) { + case PUBLIC_POOL: + return pubPool; + + case SYSTEM_POOL: + return sysPool; + + case GGFS_POOL: + return ggfsPool; + + default: + throw new IllegalArgumentException("Invalid closure execution policy: " + plc); + } + } + + /** + * Gets pool name by execution policy. + * + * @param plc Policy to choose executor pool. + * @return Pool name. + */ + private String poolName(GridClosurePolicy plc) { + switch (plc) { + case PUBLIC_POOL: + return "public"; + + case SYSTEM_POOL: + return "system"; + + case GGFS_POOL: + return "ggfs"; + + default: + throw new IllegalArgumentException("Invalid closure execution policy: " + plc); + } + } + + /** + * @param c Closure to execute. + * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. + * @return Future. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + private IgniteFuture<?> runLocal(@Nullable final Runnable c, boolean sys) throws IgniteCheckedException { + return runLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + } + + /** + * @param c Closure to execute. + * @param plc Whether to run on system or public pool. + * @return Future. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + private IgniteFuture<?> runLocal(@Nullable final Runnable c, GridClosurePolicy plc) throws IgniteCheckedException { + if (c == null) + return new GridFinishedFuture(ctx); + + enterBusy(); + + try { + // Inject only if needed. + if (!(c instanceof GridPlainRunnable)) + ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), c.getClass(), c); + + final ClassLoader ldr = Thread.currentThread().getContextClassLoader(); + + final GridWorkerFuture fut = new GridWorkerFuture(ctx); + + workersCnt.increment(); + + GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { + @Override protected void body() { + try { + if (ldr != null) + U.wrapThreadLoader(ldr, c); + else + c.run(); + + fut.onDone(); + } + catch (Throwable e) { + if (e instanceof Error) + U.error(log, "Closure execution failed with error.", e); + + fut.onDone(U.cast(e)); + } + finally { + workersCnt.decrement(); + } + } + }; + + fut.setWorker(w); + + try { + pool(plc).execute(w); + } + catch (RejectedExecutionException e) { + U.error(log, "Failed to execute worker due to execution rejection " + + "(increase upper bound on " + poolName(plc) + " executor service).", e); + + w.run(); + } + + return fut; + } + finally { + leaveBusy(); + } + } + + /** + * Executes closure on system pool. Companion to {@link #runLocal(Runnable, boolean)} but + * in case of rejected execution re-runs the closure in the current thread (blocking). + * + * @param c Closure to execute. + * @return Future. + */ + public IgniteFuture<?> runLocalSafe(Runnable c) { + return runLocalSafe(c, true); + } + + /** + * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs + * the closure in the current thread (blocking). + * + * @param c Closure to execute. + * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. + * @return Future. + */ + public IgniteFuture<?> runLocalSafe(Runnable c, boolean sys) { + return runLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + } + + /** + * Companion to {@link #runLocal(Runnable, boolean)} but in case of rejected execution re-runs + * the closure in the current thread (blocking). + * + * @param c Closure to execute. + * @param plc Policy to choose executor pool. + * @return Future. + */ + public IgniteFuture<?> runLocalSafe(Runnable c, GridClosurePolicy plc) { + try { + return runLocal(c, plc); + } + catch (Throwable e) { + if (e instanceof Error) + U.error(log, "Closure execution failed with error.", e); + + // If execution was rejected - rerun locally. + if (e.getCause() instanceof RejectedExecutionException) { + U.warn(log, "Closure execution has been rejected (will execute in the same thread) [plc=" + plc + + ", closure=" + c + ']'); + + try { + c.run(); + + return new GridFinishedFuture(ctx); + } + catch (Throwable t) { + if (t instanceof Error) + U.error(log, "Closure execution failed with error.", t); + + return new GridFinishedFuture(ctx, U.cast(t)); + } + } + // If failed for other reasons - return error future. + else + return new GridFinishedFuture(ctx, U.cast(e)); + } + } + + /** + * @param c Closure to execute. + * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. + * @return Future. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, boolean sys) throws IgniteCheckedException { + return callLocal(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + } + + /** + * @param c Closure to execute. + * @param plc Whether to run on system or public pool. + * @param <R> Type of closure return value. + * @return Future. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + private <R> IgniteFuture<R> callLocal(@Nullable final Callable<R> c, GridClosurePolicy plc) throws IgniteCheckedException { + if (c == null) + return new GridFinishedFuture<>(ctx); + + enterBusy(); + + try { + // Inject only if needed. + if (!(c instanceof GridPlainCallable)) + ctx.resource().inject(ctx.deploy().getDeployment(c.getClass().getName()), c.getClass(), c); + + final ClassLoader ldr = Thread.currentThread().getContextClassLoader(); + + final GridWorkerFuture<R> fut = new GridWorkerFuture<>(ctx); + + workersCnt.increment(); + + GridWorker w = new GridWorker(ctx.gridName(), "closure-proc-worker", log) { + @Override protected void body() { + try { + if (ldr != null) + fut.onDone(U.wrapThreadLoader(ldr, c)); + else + fut.onDone(c.call()); + } + catch (Throwable e) { + if (e instanceof Error) + U.error(log, "Closure execution failed with error.", e); + + fut.onDone(U.cast(e)); + } + finally { + workersCnt.decrement(); + } + } + }; + + fut.setWorker(w); + + try { + pool(plc).execute(w); + } + catch (RejectedExecutionException e) { + U.error(log, "Failed to execute worker due to execution rejection " + + "(increase upper bound on " + poolName(plc) + " executor service).", e); + + w.run(); + } + + return fut; + } + finally { + leaveBusy(); + } + } + + /** + * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)} + * but in case of rejected execution re-runs the closure in the current thread (blocking). + * + * @param c Closure to execute. + * @return Future. + */ + public <R> IgniteFuture<R> callLocalSafe(Callable<R> c) { + return callLocalSafe(c, true); + } + + /** + * Executes closure on system pool. Companion to {@link #callLocal(Callable, boolean)} + * but in case of rejected execution re-runs the closure in the current thread (blocking). + * + * @param c Closure to execute. + * @param sys If {@code true}, then system pool will be used, otherwise public pool will be used. + * @return Future. + */ + public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, boolean sys) { + return callLocalSafe(c, sys ? GridClosurePolicy.SYSTEM_POOL : GridClosurePolicy.PUBLIC_POOL); + } + + /** + * Companion to {@link #callLocal(Callable, boolean)} but in case of rejected execution re-runs + * the closure in the current thread (blocking). + * + * @param c Closure to execute. + * @param plc Policy to choose executor pool. + * @return Future. + */ + public <R> IgniteFuture<R> callLocalSafe(Callable<R> c, GridClosurePolicy plc) { + try { + return callLocal(c, plc); + } + catch (IgniteCheckedException e) { + // If execution was rejected - rerun locally. + if (e.getCause() instanceof RejectedExecutionException) { + U.warn(log, "Closure execution has been rejected (will execute in the same thread) [plc=" + plc + + ", closure=" + c + ']'); + + try { + return new GridFinishedFuture<>(ctx, c.call()); + } + // If failed again locally - return error future. + catch (Exception e2) { + return new GridFinishedFuture<>(ctx, U.cast(e2)); + } + } + // If failed for other reasons - return error future. + else + return new GridFinishedFuture<>(ctx, U.cast(e)); + } + } + + /** + * Converts given closure with arguments to grid job. + * @param job Job. + * @param arg Optional argument. + * @return Job. + */ + @SuppressWarnings("IfMayBeConditional") + private <T, R> ComputeJob job(final IgniteClosure<T, R> job, @Nullable final T arg) { + A.notNull(job, "job"); + + if (job instanceof ComputeJobMasterLeaveAware) { + return new GridMasterLeaveAwareComputeJobAdapter() { + @Nullable @Override public Object execute() { + return job.apply(arg); + } + + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses); + } + }; + } + else { + return new ComputeJobAdapter() { + @Nullable @Override public Object execute() { + return job.apply(arg); + } + }; + } + } + + /** + * Converts given closure to a grid job. + * + * @param c Closure to convert to grid job. + * @return Grid job made out of closure. + */ + @SuppressWarnings("IfMayBeConditional") + private ComputeJob job(final Callable<?> c) { + A.notNull(c, "job"); + + if (c instanceof ComputeJobMasterLeaveAware) { + return new GridMasterLeaveAwareComputeJobAdapter() { + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); + } + }; + } + else { + return new ComputeJobAdapter() { + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + }; + } + } + + /** + * Converts given closure to a grid job. + * + * @param c Closure to convert to grid job. + * @param cacheName Cache name. + * @param affKey Affinity key. + * @return Grid job made out of closure. + */ + @SuppressWarnings(value = {"IfMayBeConditional", "UnusedDeclaration"}) + private ComputeJob job(final Callable<?> c, @Nullable final String cacheName, final Object affKey) { + A.notNull(c, "job"); + + if (c instanceof ComputeJobMasterLeaveAware) { + return new GridMasterLeaveAwareComputeJobAdapter() { + /** */ + @GridCacheName + private final String cn = cacheName; + + /** */ + @GridCacheAffinityKeyMapped + private final Object ak = affKey; + + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)c).onMasterNodeLeft(ses); + } + }; + } + else { + return new ComputeJobAdapter() { + /** */ + @GridCacheName + private final String cn = cacheName; + + /** */ + @GridCacheAffinityKeyMapped + private final Object ak = affKey; + + @Override public Object execute() { + try { + return c.call(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + }; + } + } + + /** + * Converts given closure to a grid job. + * + * @param r Closure to convert to grid job. + * @return Grid job made out of closure. + */ + @SuppressWarnings("IfMayBeConditional") + private static ComputeJob job(final Runnable r) { + A.notNull(r, "job"); + + if (r instanceof ComputeJobMasterLeaveAware) { + return new GridMasterLeaveAwareComputeJobAdapter() { + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); + } + }; + } + else { + return new ComputeJobAdapter() { + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + }; + } + } + + /** + * Converts given closure to a grid job. + * + * @param r Closure to convert to grid job. + * @param cacheName Cache name. + * @param affKey Affinity key. + * @return Grid job made out of closure. + */ + @SuppressWarnings(value = {"IfMayBeConditional", "UnusedDeclaration"}) + private ComputeJob job(final Runnable r, @Nullable final String cacheName, final Object affKey) { + A.notNull(r, "job"); + + if (r instanceof ComputeJobMasterLeaveAware) { + return new GridMasterLeaveAwareComputeJobAdapter() { + /** */ + @GridCacheName + private final String cn = cacheName; + + /** */ + @GridCacheAffinityKeyMapped + private final Object ak = affKey; + + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + + @Override public void onMasterNodeLeft(ComputeTaskSession ses) throws IgniteCheckedException { + ((ComputeJobMasterLeaveAware)r).onMasterNodeLeft(ses); + } + }; + } + else { + return new ComputeJobAdapter() { + /** */ + @GridCacheName + private final String cn = cacheName; + + /** */ + @GridCacheAffinityKeyMapped + private final Object ak = affKey; + + @Nullable @Override public Object execute() { + r.run(); + + return null; + } + }; + } + } + + /** */ + private class JobMapper { + /** */ + private final Map<ComputeJob, ClusterNode> map; + + /** */ + private boolean hadLocNode; + + /** + * @param map Jobs map. + */ + private JobMapper(Map<ComputeJob, ClusterNode> map) { + assert map != null; + assert map.isEmpty(); + + this.map = map; + } + + /** + * @param job Job. + * @param node Node. + * @throws IgniteCheckedException In case of error. + */ + public void map(ComputeJob job, ClusterNode node) throws IgniteCheckedException { + assert job != null; + assert node != null; + + if (ctx.localNodeId().equals(node.id())) { + if (hadLocNode) { + IgniteMarshaller marsh = ctx.config().getMarshaller(); + + job = marsh.unmarshal(marsh.marshal(job), null); + } + else + hadLocNode = true; + } + + map.put(job, node); + } + } + + /** + * No-reduce task adapter. + */ + private abstract static class TaskNoReduceAdapter<T> extends GridPeerDeployAwareTaskAdapter<T, Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param pda Peer deploy aware instance. + */ + protected TaskNoReduceAdapter(@Nullable GridPeerDeployAware pda) { + super(pda); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * Task that is free of dragged in enclosing context for the method + * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection)}. + */ + private class T1 extends TaskNoReduceAdapter<Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** */ + private IgniteBiTuple<GridClosureCallMode, Collection<? extends Runnable>> t; + + /** + * @param mode Call mode. + * @param jobs Collection of jobs. + */ + private T1(GridClosureCallMode mode, Collection<? extends Runnable> jobs) { + super(U.peerDeployAware0(jobs)); + + t = F.< + GridClosureCallMode, + Collection<? extends Runnable> + >t(mode, jobs); + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + return absMap(t.get1(), t.get2(), subgrid, lb); + } + } + + /** + * Task that is free of dragged in enclosing context for the method + * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection)}. + */ + private class T2 extends TaskNoReduceAdapter<Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** */ + private IgniteBiTuple<GridClosureCallMode, Runnable> t; + + /** + * @param mode Call mode. + * @param job Job. + */ + private T2(GridClosureCallMode mode, Runnable job) { + super(U.peerDeployAware(job)); + + t = F.t(mode, job); + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + return absMap(t.get1(), F.asList(t.get2()), subgrid, lb); + } + } + + /** + * Task that is free of dragged in enclosing context for the method + * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection)} + */ + private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** */ + private GridTuple3<GridClosureCallMode, + Collection<? extends Callable<R1>>, + IgniteReducer<R1, R2> + > t; + + /** + * + * @param mode Call mode. + * @param jobs Collection of jobs. + * @param rdc Reducer. + */ + private T3(GridClosureCallMode mode, Collection<? extends Callable<R1>> jobs, IgniteReducer<R1, R2> rdc) { + super(U.peerDeployAware0(jobs)); + + t = F.< + GridClosureCallMode, + Collection<? extends Callable<R1>>, + IgniteReducer<R1, R2> + >t(mode, jobs, rdc); + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + return outMap(t.get1(), t.get2(), subgrid, lb); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) + throws IgniteCheckedException { + ComputeJobResultPolicy resPlc = super.result(res, rcvd); + + if (res.getException() == null && resPlc != FAILOVER && !t.get3().collect((R1)res.getData())) + resPlc = REDUCE; // If reducer returned false - reduce right away. + + return resPlc; + } + + /** {@inheritDoc} */ + @Override public R2 reduce(List<ComputeJobResult> res) { + return t.get3().reduce(); + } + } + + /** + */ + private class T4 extends TaskNoReduceAdapter<Void> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final String cacheName; + + /** */ + private Object affKey; + + /** */ + private Runnable job; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * @param cacheName Cache name. + * @param affKey Affinity key. + * @param job Job. + */ + private T4(@Nullable String cacheName, Object affKey, Runnable job) { + super(U.peerDeployAware0(job)); + + this.cacheName = cacheName; + this.affKey = affKey; + this.job = job; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + ComputeJob job = job(this.job, cacheName, affKey); + + return Collections.singletonMap(job, lb.getBalancedNode(job, null)); + } + } + + /** + */ + private class T5<R> extends GridPeerDeployAwareTaskAdapter<Void, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final String cacheName; + + /** */ + private Object affKey; + + /** */ + private Callable<R> job; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * @param cacheName Cache name. + * @param affKey Affinity key. + * @param job Job. + */ + private T5(@Nullable String cacheName, Object affKey, Callable<R> job) { + super(U.peerDeployAware0(job)); + + this.cacheName = cacheName; + this.affKey = affKey; + this.job = job; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + ComputeJob job = job(this.job, cacheName, affKey); + + return Collections.singletonMap(job, lb.getBalancedNode(job, null)); + } + + /** {@inheritDoc} */ + @Override public R reduce(List<ComputeJobResult> res) throws IgniteCheckedException { + for (ComputeJobResult r : res) { + if (r.getException() == null) + return r.getData(); + } + + throw new IgniteCheckedException("Failed to find successful job result: " + res); + } + } + + /** + * Task that is free of dragged in enclosing context for the method + * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection)} + */ + private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final GridClosureCallMode mode; + + /** */ + private final Collection<? extends Callable<R>> jobs; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * + * @param mode Call mode. + * @param jobs Collection of jobs. + */ + private T6( + GridClosureCallMode mode, + Collection<? extends Callable<R>> jobs) { + super(U.peerDeployAware0(jobs)); + + this.mode = mode; + this.jobs = jobs; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + return outMap(mode, jobs, subgrid, lb); + } + + /** {@inheritDoc} */ + @Override public Collection<R> reduce(List<ComputeJobResult> res) { + return F.jobResults(res); + } + } + + /** + * Task that is free of dragged in enclosing context for the method + * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection)} + */ + private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteBiTuple<GridClosureCallMode, Callable<R>> t; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * @param mode Call mode. + * @param job Job. + */ + private T7(GridClosureCallMode mode, Callable<R> job) { + super(U.peerDeployAware(job)); + + t = F.t(mode, job); + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + return outMap(t.get1(), F.asList(t.get2()), subgrid, lb); + } + + /** {@inheritDoc} */ + @Override public R reduce(List<ComputeJobResult> res) throws IgniteCheckedException { + for (ComputeJobResult r : res) + if (r.getException() == null) + return r.getData(); + + throw new IgniteCheckedException("Failed to find successful job result: " + res); + } + } + + /** + */ + private class T8<T, R> extends GridPeerDeployAwareTaskAdapter<Void, R> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteClosure<T, R> job; + + /** */ + private T arg; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * @param job Job. + * @param arg Optional job argument. + */ + private T8(IgniteClosure<T, R> job, @Nullable T arg) { + super(U.peerDeployAware(job)); + + this.job = job; + this.arg = arg; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + ComputeJob job = job(this.job, this.arg); + + return Collections.singletonMap(job, lb.getBalancedNode(job, null)); + } + + /** {@inheritDoc} */ + @Override public R reduce(List<ComputeJobResult> res) throws IgniteCheckedException { + for (ComputeJobResult r : res) + if (r.getException() == null) + return r.getData(); + + throw new IgniteCheckedException("Failed to find successful job result: " + res); + } + } + + /** + */ + private class T9<T, R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteClosure<T, R> job; + + /** */ + private Collection<? extends T> args; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * @param job Job. + * @param args Job arguments. + */ + private T9(IgniteClosure<T, R> job, Collection<? extends T> args) { + super(U.peerDeployAware(job)); + + this.job = job; + this.args = args; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + Map<ComputeJob, ClusterNode> map = new HashMap<>(args.size(), 1); + + JobMapper mapper = new JobMapper(map); + + for (T jobArg : args) { + ComputeJob job = job(this.job, jobArg); + + mapper.map(job, lb.getBalancedNode(job, null)); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Collection<R> reduce(List<ComputeJobResult> res) throws IgniteCheckedException { + return F.jobResults(res); + } + } + + /** + */ + private class T10<T, R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteClosure<T, R1> job; + + /** */ + private Collection<? extends T> args; + + /** */ + private IgniteReducer<R1, R2> rdc; + + /** */ + @IgniteLoadBalancerResource + private ComputeLoadBalancer lb; + + /** + * @param job Job. + * @param args Job arguments. + * @param rdc Reducer. + */ + private T10(IgniteClosure<T, R1> job, Collection<? extends T> args, IgniteReducer<R1, R2> rdc) { + super(U.peerDeployAware(job)); + + this.job = job; + this.args = args; + this.rdc = rdc; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + Map<ComputeJob, ClusterNode> map = new HashMap<>(args.size(), 1); + + JobMapper mapper = new JobMapper(map); + + for (T jobArg : args) { + ComputeJob job = job(this.job, jobArg); + + mapper.map(job, lb.getBalancedNode(job, null)); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) + throws IgniteCheckedException { + ComputeJobResultPolicy resPlc = super.result(res, rcvd); + + if (res.getException() == null && resPlc != FAILOVER && !rdc.collect((R1) res.getData())) + resPlc = REDUCE; // If reducer returned false - reduce right away. + + return resPlc; + } + + /** {@inheritDoc} */ + @Override public R2 reduce(List<ComputeJobResult> res) throws IgniteCheckedException { + return rdc.reduce(); + } + } + + /** + */ + private class T11<T, R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final IgniteClosure<T, R> job; + + /** */ + private final T arg; + + /** + * @param job Job. + * @param arg Job argument. + * @param nodes Collection of nodes. + */ + private T11(IgniteClosure<T, R> job, @Nullable T arg, Collection<ClusterNode> nodes) { + super(U.peerDeployAware(job)); + + this.job = job; + this.arg = arg; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, @Nullable Void arg) + throws IgniteCheckedException { + if (F.isEmpty(subgrid)) + return Collections.emptyMap(); + + Map<ComputeJob, ClusterNode> map = new HashMap<>(subgrid.size(), 1); + + JobMapper mapper = new JobMapper(map); + + for (ClusterNode n : subgrid) + mapper.map(job(job, this.arg), n); + + return map; + } + + /** {@inheritDoc} */ + @Override public Collection<R> reduce(List<ComputeJobResult> res) { + return F.jobResults(res); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java new file mode 100644 index 0000000..e28784e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridMasterLeaveAwareComputeJobAdapter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.apache.ignite.compute.*; + +/** + * Job adapter implementing {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware}. + */ +public abstract class GridMasterLeaveAwareComputeJobAdapter extends ComputeJobAdapter + implements ComputeJobMasterLeaveAware { + /** */ + private static final long serialVersionUID = 0L; + + /** + * No-arg constructor. + */ + protected GridMasterLeaveAwareComputeJobAdapter() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java new file mode 100644 index 0000000..79768a7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridPeerDeployAwareTaskAdapter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.closure; + +import org.apache.ignite.compute.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +/** + * Peer deployment aware task adapter. + */ +public abstract class GridPeerDeployAwareTaskAdapter<T, R> extends ComputeTaskAdapter<T, R> + implements GridPeerDeployAware { + /** */ + private static final long serialVersionUID = 0L; + + /** Peer deploy aware class. */ + private transient GridPeerDeployAware pda; + + /** + * Constructor that receives deployment information for task. + * + * @param pda Deployment information. + */ + protected GridPeerDeployAwareTaskAdapter(@Nullable GridPeerDeployAware pda) { + this.pda = pda; + } + + /** {@inheritDoc} */ + @Override public Class<?> deployClass() { + if (pda == null) + pda = U.detectPeerDeployAware(this); + + return pda.deployClass(); + } + + /** {@inheritDoc} */ + @Override public ClassLoader classLoader() { + if (pda == null) + pda = U.detectPeerDeployAware(this); + + return pda.classLoader(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html new file mode 100644 index 0000000..cbdb317 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + TODO. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java new file mode 100644 index 0000000..17c7a0a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +/** + * Continuous routine handler. + */ +public interface GridContinuousHandler extends Externalizable { + /** + * Registers listener. + * + * @param nodeId ID of the node that started routine. + * @param routineId Routine ID. + * @param ctx Kernal context. + * @return Whether listener was actually registered. + * @throws IgniteCheckedException In case of error. + */ + public boolean register(UUID nodeId, UUID routineId, GridKernalContext ctx) throws IgniteCheckedException; + + /** + * Callback called after listener is registered and acknowledgement is sent. + * + * @param routineId Routine ID. + * @param ctx Kernal context. + */ + public void onListenerRegistered(UUID routineId, GridKernalContext ctx); + + /** + * Unregisters listener. + * + * @param routineId Routine ID. + * @param ctx Kernal context. + */ + public void unregister(UUID routineId, GridKernalContext ctx); + + /** + * Notifies local callback. + * + * @param nodeId ID of the node where notification came from. + * @param routineId Routine ID. + * @param objs Notification objects. + * @param ctx Kernal context. + */ + public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx); + + /** + * Deploys and marshals inner objects (called only if peer deployment is enabled). + * + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException; + + /** + * Unmarshals inner objects (called only if peer deployment is enabled). + * + * @param nodeId Sender node ID. + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException; + + /** + * @return Topic for ordered notifications. If {@code null}, notifications + * will be sent in non-ordered messages. + */ + @Nullable public Object orderedTopic(); + + /** + * @return {@code True} if for events. + */ + public boolean isForEvents(); + + /** + * @return {@code True} if for messaging. + */ + public boolean isForMessaging(); + + /** + * @return {@code True} if for continuous queries. + */ + public boolean isForQuery(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java new file mode 100644 index 0000000..1c63a8a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.tostring.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.*; + +/** + * Continuous processor message. + */ +public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Message type. */ + private GridContinuousMessageType type; + + /** Routine ID. */ + private UUID routineId; + + /** Optional message data. */ + @GridToStringInclude + @GridDirectTransient + private Object data; + + /** Serialized message data. */ + private byte[] dataBytes; + + /** Future ID for synchronous event notifications. */ + private IgniteUuid futId; + + /** + * Required by {@link Externalizable}. + */ + public GridContinuousMessage() { + // No-op. + } + + /** + * @param type Message type. + * @param routineId Consume ID. + * @param futId Future ID. + * @param data Optional message data. + */ + GridContinuousMessage(GridContinuousMessageType type, + @Nullable UUID routineId, + @Nullable IgniteUuid futId, + @Nullable Object data) { + assert type != null; + assert routineId != null || type == MSG_EVT_ACK; + + this.type = type; + this.routineId = routineId; + this.futId = futId; + this.data = data; + } + + /** + * @return Message type. + */ + public GridContinuousMessageType type() { + return type; + } + + /** + * @return Consume ID. + */ + public UUID routineId() { + return routineId; + } + + /** + * @return Message data. + */ + @SuppressWarnings("unchecked") + public <T> T data() { + return (T)data; + } + + /** + * @param data Message data. + */ + public void data(Object data) { + this.data = data; + } + + /** + * @return Serialized message data. + */ + public byte[] dataBytes() { + return dataBytes; + } + + /** + * @param dataBytes Serialized message data. + */ + public void dataBytes(byte[] dataBytes) { + this.dataBytes = dataBytes; + } + + /** + * @return Future ID for synchronous event notification. + */ + @Nullable public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridContinuousMessage _clone = new GridContinuousMessage(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) { + GridContinuousMessage clone = (GridContinuousMessage)msg; + + clone.type = type; + clone.routineId = routineId; + clone.futId = futId; + clone.data = data; + clone.dataBytes = dataBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putByteArray(dataBytes)) + return false; + + commState.idx++; + + case 1: + if (!commState.putGridUuid(futId)) + return false; + + commState.idx++; + + case 2: + if (!commState.putUuid(routineId)) + return false; + + commState.idx++; + + case 3: + if (!commState.putEnum(type)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("fallthrough") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: + byte[] dataBytes0 = commState.getByteArray(); + + if (dataBytes0 == BYTE_ARR_NOT_READ) + return false; + + dataBytes = dataBytes0; + + commState.idx++; + + case 1: + IgniteUuid futId0 = commState.getGridUuid(); + + if (futId0 == GRID_UUID_NOT_READ) + return false; + + futId = futId0; + + commState.idx++; + + case 2: + UUID routineId0 = commState.getUuid(); + + if (routineId0 == UUID_NOT_READ) + return false; + + routineId = routineId0; + + commState.idx++; + + case 3: + if (buf.remaining() < 1) + return false; + + byte type0 = commState.getByte(); + + type = fromOrdinal(type0); + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 60; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridContinuousMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java new file mode 100644 index 0000000..eb33613 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import org.jetbrains.annotations.*; + +/** + * Continuous processor message types. + */ +enum GridContinuousMessageType { + /** Consume start request. */ + MSG_START_REQ, + + /** Consume start acknowledgement. */ + MSG_START_ACK, + + /** Consume stop request. */ + MSG_STOP_REQ, + + /** Consume stop acknowledgement. */ + MSG_STOP_ACK, + + /** Remote event notification. */ + MSG_EVT_NOTIFICATION, + + /** Event notification acknowledgement for synchronous events. */ + MSG_EVT_ACK; + + /** Enumerated values. */ + private static final GridContinuousMessageType[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static GridContinuousMessageType fromOrdinal(byte ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +}
