This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 21f992a IGNITE-14112 Revisit usages of GridClosureProcessor.runLocalSafe and GridClosureProcessor.callLocalSafe methods - Fixes #8743. 21f992a is described below commit 21f992a6958529cafd389505a05ad3506a565858 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> AuthorDate: Mon Feb 8 17:40:13 2021 +0300 IGNITE-14112 Revisit usages of GridClosureProcessor.runLocalSafe and GridClosureProcessor.callLocalSafe methods - Fixes #8743. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../ignite/internal/IgniteSchedulerImpl.java | 4 +- .../processors/cache/CacheGroupContext.java | 3 +- .../processors/cache/GridCacheAdapter.java | 28 ++++++------ .../processors/cache/GridCacheIoManager.java | 3 +- .../processors/cache/GridCacheMvccManager.java | 2 +- .../cache/GridCachePartitionExchangeManager.java | 3 +- .../cache/GridDeferredAckMessageSender.java | 3 +- .../distributed/GridCacheTxRecoveryFuture.java | 3 +- .../cache/distributed/dht/GridDhtCacheAdapter.java | 2 +- .../dht/GridDhtTxAbstractEnlistFuture.java | 3 +- .../dht/GridPartitionedSingleGetFuture.java | 2 +- .../atomic/GridNearAtomicSingleUpdateFuture.java | 5 ++- .../dht/atomic/GridNearAtomicUpdateFuture.java | 5 ++- .../dht/preloader/GridDhtPartitionDemander.java | 5 ++- .../preloader/GridDhtPartitionsExchangeFuture.java | 4 +- .../dht/preloader/latch/ExchangeLatchManager.java | 3 +- .../cache/distributed/near/GridNearTxLocal.java | 3 +- .../processors/cache/local/GridLocalCache.java | 6 +-- .../cache/local/atomic/GridLocalAtomicCache.java | 9 ++-- .../processors/cache/mvcc/MvccProcessorImpl.java | 3 +- .../query/GridCacheDistributedQueryFuture.java | 6 +-- .../query/GridCacheDistributedQueryManager.java | 4 +- .../continuous/CacheContinuousQueryHandler.java | 3 +- .../cache/transactions/IgniteTxManager.java | 5 ++- .../PartitionCountersNeighborcastFuture.java | 3 +- .../processors/cluster/ClusterProcessor.java | 5 ++- .../cluster/GridClusterStateProcessor.java | 3 +- .../continuous/GridContinuousProcessor.java | 3 +- .../datastreamer/DataStreamProcessor.java | 3 +- .../processors/datastreamer/DataStreamerImpl.java | 5 ++- .../datastructures/DataStructuresProcessor.java | 5 ++- .../marshaller/GridMarshallerMappingProcessor.java | 3 +- .../PerformanceStatisticsProcessor.java | 3 +- .../handlers/cache/GridCacheCommandHandler.java | 5 ++- .../DataStructuresCommandHandler.java | 6 +-- .../rest/handlers/query/QueryCommandHandler.java | 8 ++-- .../processors/service/GridServiceProcessor.java | 3 +- .../processors/service/ServiceDeploymentTask.java | 3 +- .../internal/processors/task/GridTaskWorker.java | 5 ++- .../internal/visor/query/VisorQueryUtils.java | 5 ++- .../ignite/p2p/GridP2PLocalDeploymentSelfTest.java | 2 +- .../query/h2/twostep/GridMapQueryExecutor.java | 50 +++++++++++----------- 42 files changed, 136 insertions(+), 101 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java index 804c0ff..6930d5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteSchedulerImpl.java @@ -30,6 +30,8 @@ import org.apache.ignite.IgniteScheduler; import org.apache.ignite.internal.processors.security.OperationSecurityContext; import org.apache.ignite.internal.processors.security.SecurityUtils; import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.lang.GridPlainCallable; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.scheduler.SchedulerFuture; @@ -181,7 +183,7 @@ public class IgniteSchedulerImpl implements IgniteScheduler, Externalizable { } /** */ - private class SecurityAwareClosure<T> implements Runnable, Callable<T>, GridInternalWrapper<Object> { + private class SecurityAwareClosure<T> implements GridPlainRunnable, GridPlainCallable<T>, GridInternalWrapper<Object> { /** Security subject id. */ private final UUID secSubjId; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 1994457..8492982 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipC import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -1027,7 +1028,7 @@ public class CacheGroupContext { final List<Runnable> procC = skipCtx != null ? skipCtx.processClosures() : null; if (procC != null) { - ctx.kernalContext().closure().runLocalSafe(new Runnable() { + ctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { for (Runnable c : procC) c.run(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index ff80d43..2eddcad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -122,6 +122,8 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.GridPlainCallable; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; @@ -1257,8 +1259,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Clear future. */ private IgniteInternalFuture<?> clearLocallyAsync(@Nullable final Set<? extends K> keys) { - return ctx.closures().callLocalSafe(new Callable<Object>() { - @Override public Object call() throws Exception { + return ctx.closures().callLocalSafe(new GridPlainCallable<Object>() { + @Override public Object call() { if (keys == null) clearLocally(true, false, false); else @@ -3876,7 +3878,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public IgniteInternalFuture<?> localLoadCacheAsync(final IgniteBiPredicate<K, V> p, final Object[] args) { return ctx.closures().callLocalSafe( - ctx.projectSafe(new Callable<Object>() { + ctx.projectSafe(new GridPlainCallable<Object>() { @Nullable @Override public Object call() throws IgniteCheckedException { localLoadCache(p, args); @@ -4588,7 +4590,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V (IgniteOutClosure<IgniteInternalFuture>)() -> { GridFutureAdapter resFut = new GridFutureAdapter(); - ctx.kernalContext().closure().runLocalSafe(() -> { + ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> { IgniteInternalFuture fut0; if (ctx.kernalContext().isStopping()) @@ -5135,7 +5137,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert orig == null || orig.optimistic() || orig.readCommitted() || /*contains*/ skipVals; // Async check and recover if necessary. - return ctx.kernalContext().closure().callLocalSafe(new Callable<Void>() { + return ctx.kernalContext().closure().callLocalSafe(new GridPlainCallable<Void>() { @Override public Void call() throws IgniteCheckedException { CacheOperationContext prevOpCtx = ctx.operationContextPerCall(); @@ -5365,12 +5367,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException { if (isLocal()) { - return ctx.kernalContext().closure().runLocalSafe(() -> { - try { - ctx.offheap().preloadPartition(part); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); + return ctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { + @Override public void run() { + try { + ctx.offheap().preloadPartition(part); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } }); } @@ -6840,7 +6844,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { - ignite.context().closure().runLocalSafe(new Runnable() { + ignite.context().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { jobCtx.callcc(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 78a330e..2d28b5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -91,6 +91,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -210,7 +211,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { handleMessage(nodeId, cacheMsg, plc); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 33e15a6..d9b38b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -361,7 +361,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { */ public void removeExplicitNodeLocks(UUID leftNodeId) { cctx.kernalContext().closure().runLocalSafe( - new Runnable() { + new GridPlainRunnable() { @Override public void run() { for (GridDistributedCacheEntry entry : locked()) { try { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index a55e88a..126a27e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -134,6 +134,7 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -3688,7 +3689,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** {@inheritDoc} */ @Override public void onTimeout() { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { if (!busyLock.readLock().tryLock()) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java index 840bfd5..de0f1c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.util.deque.FastSizeDeque; @@ -153,7 +154,7 @@ public abstract class GridDeferredAckMessageSender<T> { /** {@inheritDoc} */ @Override public void onTimeout() { if (guard.compareAndSet(false, true)) { - c.runLocalSafe(new Runnable() { + c.runLocalSafe(new GridPlainRunnable() { @Override public void run() { writeLock().lock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index 3fc053d..64034ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -459,7 +460,7 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B final MiniFuture f = (MiniFuture)fut; if (f.nodeId().equals(nodeId)) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { f.onNodeLeft(nodeId); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index b52ec45..4bfb4fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -1093,7 +1093,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param incomingReq Original ttl request. */ private void sendTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest incomingReq) { - ctx.closures().runLocalSafe(new Runnable() { + ctx.closures().runLocalSafe(new GridPlainRunnable() { @SuppressWarnings({"ForLoopReplaceableByForEach"}) @Override public void run() { Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java index 21b895d..08843bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.UpdateSourceIterator; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; @@ -1035,7 +1036,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd if (nearNodeId.equals(nodeId)) onDone(new ClusterTopologyCheckedException("Requesting node left the grid [nodeId=" + nodeId + ']')); else if (pending != null && pending.remove(nodeId) != null) - cctx.kernalContext().closure().runLocalSafe(() -> continueLoop(false)); + cctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> continueLoop(false)); } catch (Exception e) { onDone(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index 1dcf4e4..110b4e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -918,7 +918,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec * @param topVer Topology version. */ private void remap(final AffinityTopologyVersion topVer) { - cctx.closures().runLocalSafe(new Runnable() { + cctx.closures().runLocalSafe(new GridPlainRunnable() { @Override public void run() { // If topology changed reset collection of invalid nodes. synchronized (this) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index 18d2684..29a2645 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -373,7 +374,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { mapOnTopology(); } @@ -432,7 +433,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { mapOnTopology(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 6045b96..fb37a29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; @@ -498,7 +499,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { mapOnTopology(); } @@ -652,7 +653,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { mapOnTopology(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index bae41ca..7c022ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridIterableAdapter; import org.apache.ignite.internal.util.lang.GridIterableAdapter.IteratorWrapper; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgnitePredicateX; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -1362,14 +1363,14 @@ public class GridDhtPartitionDemander { } if (waitCnt.decrementAndGet() == 0) - ctx.kernalContext().closure().runLocalSafe(() -> requestPartitions0(node, parts, d)); + ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> requestPartitions0(node, parts, d)); } }); } // The special case for historical only rebalancing. if (d.partitions().fullSet().isEmpty() && !d.partitions().historicalSet().isEmpty()) - ctx.kernalContext().closure().runLocalSafe(() -> requestPartitions0(node, parts, d)); + ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> requestPartitions0(node, parts, d)); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 65e4b4d..b49dd73 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -32,7 +32,6 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -119,6 +118,7 @@ import org.apache.ignite.internal.processors.tracing.SpanTags; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.TimeBag; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -5174,7 +5174,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert newCrdFut != null; - cctx.kernalContext().closure().callLocal(new Callable<Void>() { + cctx.kernalContext().closure().callLocal(new GridPlainCallable<Void>() { @Override public Void call() throws Exception { try { newCrdFut.init(GridDhtPartitionsExchangeFuture.this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java index e09f8a0..a9a8fb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -134,7 +135,7 @@ public class ExchangeLatchManager { // Do not process from discovery thread. // TODO: Should use queue to guarantee the order of processing left nodes. - ctx.closure().runLocalSafe(() -> processNodeLeft(cache.version(), e.eventNode())); + ctx.closure().runLocalSafe((GridPlainRunnable)() -> processNodeLeft(cache.version(), e.eventNode())); }, EVT_NODE_LEFT, EVT_NODE_FAILED); ctx.event().addDiscoveryEventListener((e, cache) -> { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f1f99d3..babdf80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -97,6 +97,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridInClosure3; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; @@ -5159,7 +5160,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } if (proceed || (state() == MARKED_ROLLBACK)) { - cctx.kernalContext().closure().runLocalSafe(new Runnable() { + cctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { // Note: if rollback asynchronously on timeout should not clear thread map // since thread started tx still should be able to see this tx. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index d33aff2..7e91fbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.local; import java.io.Externalizable; import java.util.Collection; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -37,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.transactions.TransactionIsolation; @@ -189,7 +189,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync() { - return ctx.closures().callLocalSafe(new Callable<Void>() { + return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() { @Override public Void call() throws Exception { removeAll(); @@ -240,7 +240,7 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException { - return ctx.closures().callLocalSafe(new Callable<Void>() { + return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() { @Override public Void call() throws Exception { preloadPartition(part); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 982e296..81ee613 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.resource.GridResourceIoc; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -286,7 +287,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> removeAllAsync() { - return ctx.closures().callLocalSafe(new Callable<Void>() { + return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() { @Override public Void call() throws Exception { removeAll(); @@ -350,7 +351,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { final boolean storeEnabled = ctx.readThrough(); - return asyncOp(new Callable<Map<K, V>>() { + return asyncOp(new GridPlainCallable<Map<K, V>>() { @Override public Map<K, V> call() throws Exception { return getAllInternal(keys, storeEnabled, taskName, deserializeBinary, skipVals, needVer); } @@ -775,7 +776,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - return asyncOp(new Callable<Object>() { + return asyncOp(new GridPlainCallable<Object>() { @Override public Object call() throws Exception { return updateAllInternal(op, keys, @@ -817,7 +818,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - return asyncOp(new Callable<Object>() { + return asyncOp(new GridPlainCallable<Object>() { @Override public Object call() throws Exception { return updateAllInternal(DELETE, keys, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java index 9fbf83c..da9aab5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java @@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -654,7 +655,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce // Complete init future if local node is a new coordinator. All previous txs have been already completed here. if (curCrd0.local()) - ctx.closure().runLocalSafe(initFut::onDone); + ctx.closure().runLocalSafe((GridPlainRunnable)initFut::onDone); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java index 0019df0..8f08dea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java @@ -22,13 +22,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.U; @@ -105,8 +105,8 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu cctx.deploymentEnabled()); // Process cancel query directly (without sending) for local node, - cctx.closures().callLocalSafe(new Callable<Object>() { - @Override public Object call() throws Exception { + cctx.closures().callLocalSafe(new GridPlainCallable<Object>() { + @Override public Object call() { qryMgr.processQueryRequest(cctx.localNodeId(), req); return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 5474fc5..08fbfe8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; @@ -43,6 +42,7 @@ import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; @@ -894,7 +894,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage } if (locNode != null) { - cctx.closures().callLocalSafe(new Callable<Object>() { + cctx.closures().callLocalSafe(new GridPlainCallable<Object>() { @Override public Object call() throws Exception { req.beforeLocalExecution(cctx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index ee0e182..1573e8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -1341,7 +1342,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler final UUID routineId, final GridKernalContext ctx) { if (t != null) { - ctx.closure().runLocalSafe(new Runnable() { + ctx.closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { GridCacheContext<K, V> cctx = cacheContext(ctx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index a561b9b..fb3a892 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; import org.apache.ignite.internal.util.TimeBag; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; import org.apache.ignite.internal.util.typedef.CI1; @@ -2184,7 +2185,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { scheduleDumpTask( IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, () -> cctx.kernalContext().closure().runLocalSafe( - () -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout)), + (GridPlainRunnable)() -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout)), longOpsDumpTimeout); } @@ -3240,7 +3241,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** * Transactions recovery initialization runnable. */ - private final class TxRecoveryInitRunnable implements Runnable { + private final class TxRecoveryInitRunnable implements GridPlainRunnable { /** */ private final ClusterNode node; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java index 39b79d4..a4ac865 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage; import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -174,7 +175,7 @@ public class PartitionCountersNeighborcastFuture extends GridCacheCompoundIdenti MiniFuture mini = (MiniFuture)fut; if (mini.nodeId.equals(nodeId)) { - cctx.kernalContext().closure().runLocalSafe(mini::onDone); + cctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)mini::onDone); return; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 1f2a107..7625368 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.util.GridTimerTask; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; @@ -240,7 +241,7 @@ public class ClusterProcessor extends GridProcessorAdapter implements Distribute ", previous value was " + oldVal.tag(); - ctx.closure().runLocalSafe(() -> ctx.event().record( + ctx.closure().runLocalSafe((GridPlainRunnable)() -> ctx.event().record( new ClusterTagUpdatedEvent( ctx.discovery().localNode(), msg, @@ -278,7 +279,7 @@ public class ClusterProcessor extends GridProcessorAdapter implements Distribute this.metastorage = metastorage; ctx.closure().runLocalSafe( - () -> { + (GridPlainRunnable)() -> { try { ClusterIdAndTag idAndTag = new ClusterIdAndTag(cluster.id(), cluster.tag()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index 085d38c..a774d520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -1412,7 +1413,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I checkLocalNodeInBaseline(globalState.baselineTopology()); - ctx.closure().runLocalSafe(new Runnable() { + ctx.closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { boolean client = ctx.clientNode(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 5e31c41..293225a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -702,7 +703,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (ctx.config().isPeerClassLoadingEnabled()) { // Peer class loading cannot be performed before a node joins, so we delay the deployment. // Run the deployment task in the system pool to avoid blocking of the discovery thread. - ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe(() -> { + ctx.discovery().localJoinFuture().listen(f -> ctx.closure().runLocalSafe((GridPlainRunnable)() -> { try { hnd.p2pUnmarshal(srcNodeId, ctx); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 9069318..0e4f463 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -233,7 +234,7 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { fut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> t) { - ctx.closure().runLocalSafe(new Runnable() { + ctx.closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { processRequest(nodeId, req); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index a6d7182..7c24263 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -331,7 +332,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed // Only async notification is possible since // discovery thread may be trapped otherwise. if (buf != null) { - waitAffinityAndRun(new Runnable() { + waitAffinityAndRun(new GridPlainRunnable() { @Override public void run() { buf.onNodeLeft(); } @@ -1068,7 +1069,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed if (bufMappings.remove(nodeId, buf)) { final Buffer buf0 = buf; - waitAffinityAndRun(new Runnable() { + waitAffinityAndRun(new GridPlainRunnable() { @Override public void run() { buf0.onNodeLeft(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 232f5fc..6a8c400 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.lang.IgniteClosureX; import org.apache.ignite.internal.util.lang.IgniteInClosureX; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; @@ -148,8 +149,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen // This may require cache operation to execute, // therefore cannot use event notification thread. ctx.closure().callLocalSafe( - new Callable<Object>() { - @Override public Object call() throws Exception { + new GridPlainCallable<Object>() { + @Override public Object call() { DiscoveryEvent discoEvt = (DiscoveryEvent)evt; UUID leftNodeId = discoEvt.eventNode().id(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java index 663a3f7..bb8b5e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.spi.discovery.DiscoveryDataBag; @@ -304,7 +305,7 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter { final MarshallerMappingItem item = msg.getMappingItem(); marshallerCtx.onMappingAccepted(item); - closProc.runLocalSafe(new Runnable() { + closProc.runLocalSafe(new GridPlainRunnable() { @Override public void run() { for (MappingUpdatedListener lsnr : mappingUpdatedLsnrs) lsnr.mappingUpdated(item.platformId(), item.typeId(), item.className()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java index f648114..eec7c1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage; import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener; import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage; import org.apache.ignite.internal.util.GridIntList; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; @@ -232,7 +233,7 @@ public class PerformanceStatisticsProcessor extends GridProcessorAdapter { /** Starts or stops collecting statistics on metastorage update. */ private void onMetastorageUpdate(boolean start) { - ctx.closure().runLocalSafe(() -> { + ctx.closure().runLocalSafe((GridPlainRunnable)() -> { if (start) startWriter(); else diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 22e9d51..c45f77e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.rest.request.GridRestRequest; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.lang.IgniteClosure2X; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; @@ -218,7 +219,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { if (val == null) throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); - return ctx.closure().callLocalSafe(new Callable<Object>() { + return ctx.closure().callLocalSafe(new GridPlainCallable<Object>() { @Override public Object call() throws Exception { EntryProcessorResult<Boolean> res = cache.invoke(key, new EntryProcessor<Object, Object, Boolean>() { @Override public Boolean process(MutableEntry<Object, Object> entry, @@ -1729,7 +1730,7 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { GridKernalContext ctx) { assert c != null; - return ctx.closure().callLocalSafe(new Callable<Object>() { + return ctx.closure().callLocalSafe(new GridPlainCallable<Object>() { @Override public Object call() throws Exception { EntryProcessorResult<Boolean> res = c.invoke(key, new EntryProcessor<Object, Object, Boolean>() { @Override public Boolean process(MutableEntry<Object, Object> entry, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java index df4aabc..fb87c12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/datastructures/DataStructuresCommandHandler.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.rest.handlers.datastructures; import java.util.Collection; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteAtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; @@ -29,6 +28,7 @@ import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandle import org.apache.ignite.internal.processors.rest.request.DataStructuresRequest; import org.apache.ignite.internal.processors.rest.request.GridRestRequest; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.internal.U; @@ -97,8 +97,8 @@ public class DataStructuresCommandHandler extends GridRestCommandHandlerAdapter return new GridFinishedFuture(err); } - return ctx.closure().callLocalSafe(new Callable<Object>() { - @Override public Object call() throws Exception { + return ctx.closure().callLocalSafe(new GridPlainCallable<Object>() { + @Override public Object call() { Long init = req.initial(); Long delta = req.delta(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index 384ba35..cdb92dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -25,7 +25,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -47,6 +46,7 @@ import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandle import org.apache.ignite.internal.processors.rest.request.GridRestRequest; import org.apache.ignite.internal.processors.rest.request.RestQueryRequest; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -261,7 +261,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** * Execute query callable. */ - private static class ExecuteQueryCallable implements Callable<GridRestResponse> { + private static class ExecuteQueryCallable implements GridPlainCallable<GridRestResponse> { /** Kernal context. */ private GridKernalContext ctx; @@ -401,7 +401,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** * Close query callable. */ - private static class CloseQueryCallable implements Callable<GridRestResponse> { + private static class CloseQueryCallable implements GridPlainCallable<GridRestResponse> { /** Current queries cursors. */ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs; @@ -452,7 +452,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** * Fetch query callable. */ - private static class FetchQueryCallable implements Callable<GridRestResponse> { + private static class FetchQueryCallable implements GridPlainCallable<GridRestResponse> { /** Current queries cursors. */ private final ConcurrentHashMap<Long, QueryCursorIterator> qryCurs; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index c0f624f..4836991 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -82,6 +82,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridCloseableIterator; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; @@ -257,7 +258,7 @@ public class GridServiceProcessor extends ServiceProcessorAdapter implements Ign else { // Listener for client nodes is registered in onContinuousProcessorStarted method. assert !ctx.isDaemon(); - ctx.closure().runLocalSafe(new Runnable() { + ctx.closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { try { Iterable<CacheEntryEvent<?, ?>> entries = diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java index 7778dc2..29cd6ad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -457,7 +458,7 @@ class ServiceDeploymentTask { if (isCompleted()) return; - ctx.closure().runLocalSafe(() -> { + ctx.closure().runLocalSafe((GridPlainRunnable)() -> { try { ServiceDeploymentActions depResults = msg.servicesDeploymentActions(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 1b74977..539676c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.closure.AffinityTask; import org.apache.ignite.internal.processors.service.GridServiceNotFoundException; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -1005,7 +1006,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec if (waitForAffTop && affFut != null) { affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> fut0) { - ctx.closure().runLocalSafe(new Runnable() { + ctx.closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { onResponse(failoverRes); } @@ -1024,7 +1025,7 @@ public class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObjec private void sendRetryRequest(final long waitms, final GridJobResultImpl jRes, final GridJobExecuteResponse resp) { ctx.timeout().schedule(new Runnable() { @Override public void run() { - ctx.closure().runLocalSafe(new Runnable() { + ctx.closure().runLocalSafe(new GridPlainRunnable() { @Override public void run() { try { ClusterNode newNode = ctx.affinity().mapPartitionToNode(affCacheName, affPartId, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java index 7979d87..9e19fb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryUtils.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.SB; @@ -360,7 +361,7 @@ public class VisorQueryUtils { final VisorQueryTaskArg arg, final GridQueryCancel cancel ) { - ignite.context().closure().runLocalSafe(() -> { + ignite.context().closure().runLocalSafe((GridPlainRunnable)() -> { try { SqlFieldsQuery qry = new SqlFieldsQuery(arg.getQueryText()); @@ -440,7 +441,7 @@ public class VisorQueryUtils { final VisorQueryHolder holder, final VisorScanQueryTaskArg arg ) { - ignite.context().closure().runLocalSafe(() -> { + ignite.context().closure().runLocalSafe((GridPlainRunnable)() -> { try { IgniteCache<Object, Object> c = ignite.cache(arg.getCacheName()); String filterText = arg.getFilter(); diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java index 6b2dbd4..4459e47 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PLocalDeploymentSelfTest.java @@ -257,7 +257,7 @@ public class GridP2PLocalDeploymentSelfTest extends GridCommonAbstractTest { @Override public void run() { stop.set(true); } - }, 10, TimeUnit.SECONDS); + }, 5, TimeUnit.SECONDS); fut.get(); } finally { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 7d0d4ee..4e7b163 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -74,6 +73,7 @@ import org.apache.ignite.internal.processors.tracing.MTC; import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings; import org.apache.ignite.internal.processors.tracing.Span; import org.apache.ignite.internal.processors.tracing.SpanType; +import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -237,32 +237,32 @@ public class GridMapQueryExecutor { Span span = MTC.span(); ctx.closure().callLocal( - (Callable<Void>)() -> { + (GridPlainCallable<Void>)() -> { try (TraceSurroundings ignored = MTC.supportContinual(span)) { - onQueryRequest0(node, - req.requestId(), - segment, - req.schemaName(), - req.queries(), - cacheIds, - req.topologyVersion(), - partsMap, - parts, - req.pageSize(), - distributedJoins, - enforceJoinOrder, - false, - timeout, - params, - lazy, - req.mvccSnapshot(), - dataPageScanEnabled, - treatReplicatedAsPartitioned - ); + onQueryRequest0(node, + req.requestId(), + segment, + req.schemaName(), + req.queries(), + cacheIds, + req.topologyVersion(), + partsMap, + parts, + req.pageSize(), + distributedJoins, + enforceJoinOrder, + false, + timeout, + params, + lazy, + req.mvccSnapshot(), + dataPageScanEnabled, + treatReplicatedAsPartitioned + ); - return null; - } - }, + return null; + } + }, QUERY_POOL); }