This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8a1f0731532 IGNITE-20178 Introduce param-free
IgniteInternalFuture.listen(() -> {}) in addition to .listen((fut) -> {}) to
avoid ignored params (#10885)
8a1f0731532 is described below
commit 8a1f0731532802cdc8ee681d28dc324c76c7ba80
Author: Anton Vinogradov <[email protected]>
AuthorDate: Mon Aug 14 13:30:30 2023 +0300
IGNITE-20178 Introduce param-free IgniteInternalFuture.listen(() -> {}) in
addition to .listen((fut) -> {}) to avoid ignored params (#10885)
---
.../GridCommandHandlerIndexRebuildStatusTest.java | 4 +-
.../ignite/internal/GridEventConsumeHandler.java | 4 +-
.../ignite/internal/GridMessageListenHandler.java | 4 +-
.../ignite/internal/IgniteInternalFuture.java | 33 ++++++++++++++++-
.../internal/client/thin/TcpClientChannel.java | 4 +-
.../io/gridnioserver/GridNioClientConnection.java | 3 +-
.../cluster/DistributedConfigurationUtils.java | 4 +-
.../internal/management/cache/IdleVerifyJob.java | 4 +-
.../management/meta/MetadataRemoveTask.java | 6 +--
.../managers/encryption/GridEncryptionManager.java | 6 +--
.../managers/indexing/IndexesRebuildTask.java | 4 +-
.../IgniteAuthenticationProcessor.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 41 ++++++++++-----------
.../processors/cache/GridCacheMvccManager.java | 2 +-
.../distributed/GridCacheTxRecoveryFuture.java | 4 +-
.../distributed/GridDistributedCacheAdapter.java | 4 +-
.../dht/CacheDistributedGetFutureAdapter.java | 2 +-
.../cache/distributed/dht/GridDhtLockFuture.java | 2 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 12 +++---
.../distributed/dht/GridPartitionedGetFuture.java | 6 +--
.../dht/GridPartitionedSingleGetFuture.java | 8 ++--
.../dht/colocated/GridDhtColocatedLockFuture.java | 2 +-
.../dht/preloader/GridDhtPartitionDemander.java | 2 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 3 +-
.../dht/preloader/latch/ExchangeLatchManager.java | 2 +-
.../dht/topology/PartitionsEvictManager.java | 2 +-
.../cache/distributed/near/GridNearGetFuture.java | 4 +-
.../cache/distributed/near/GridNearLockFuture.java | 8 ++--
.../near/GridNearTxAbstractEnlistFuture.java | 6 +--
.../distributed/near/GridNearTxEnlistFuture.java | 3 +-
.../distributed/near/GridNearTxFinishFuture.java | 8 ++--
.../cache/distributed/near/GridNearTxLocal.java | 30 +++++++--------
.../near/GridNearTxQueryEnlistFuture.java | 2 +-
.../near/GridNearTxQueryResultsEnlistFuture.java | 3 +-
.../GridNearReadRepairAbstractFuture.java | 4 +-
.../GridNearReadRepairCheckOnlyFuture.java | 4 +-
.../near/consistency/GridNearReadRepairFuture.java | 2 +-
.../processors/cache/mvcc/MvccProcessorImpl.java | 8 ++--
.../cache/mvcc/MvccQueryTrackerImpl.java | 2 +-
.../GridCacheDatabaseSharedManager.java | 2 +-
.../checkpoint/CheckpointContextImpl.java | 2 +-
.../checkpoint/CheckpointProgressImpl.java | 6 +--
.../CachePartitionDefragmentationManager.java | 8 ++--
.../snapshot/IgniteSnapshotManager.java | 36 +++++++++---------
.../snapshot/IncrementalSnapshotFutureTask.java | 8 ++--
.../snapshot/IncrementalSnapshotMarkWalFuture.java | 4 +-
.../persistence/snapshot/SnapshotFutureTask.java | 2 +-
.../snapshot/SnapshotRestoreProcess.java | 12 +++---
.../continuous/CacheContinuousQueryHandler.java | 6 +--
.../cache/transactions/IgniteTxHandler.java | 16 ++++----
.../cache/transactions/IgniteTxLocalAdapter.java | 6 +--
.../cache/transactions/IgniteTxManager.java | 18 ++++-----
.../cluster/GridClusterStateProcessor.java | 3 +-
.../persistence/DmsDataWriterWorker.java | 2 +-
.../processors/odbc/ClientListenerNioListener.java | 4 +-
.../platform/client/compute/ClientComputeTask.java | 12 +++---
.../platform/compute/PlatformCompute.java | 17 +++++++++
.../processors/query/GridQueryProcessor.java | 4 +-
.../query/running/RunningQueryManager.java | 2 +-
.../query/schema/SchemaIndexCacheVisitorImpl.java | 10 ++---
.../stat/IgniteStatisticsConfigurationManager.java | 4 +-
.../processors/service/ServiceDeploymentTask.java | 8 ++--
.../util/distributed/DistributedProcess.java | 14 +++----
.../internal/util/future/GridFinishedFuture.java | 43 ++++++++++++++++------
.../internal/util/future/GridFutureAdapter.java | 25 +++++++++----
.../util/nio/GridSelectorNioSessionImpl.java | 4 +-
.../internal/util/nio/ssl/GridNioSslFilter.java | 2 +-
.../tcp/internal/ConnectionClientPool.java | 4 +-
.../db/IgnitePdsDataRegionMetricsTest.java | 2 +-
.../snapshot/IgniteSnapshotRemoteRequestTest.java | 4 +-
.../TxPartitionCounterStateAbstractTest.java | 2 +-
.../TxRecoveryWithConcurrentRollbackTest.java | 4 +-
.../continuous/GridEventConsumeSelfTest.java | 2 +-
.../rest/RestProcessorInitializationTest.java | 2 +-
.../DistributedProcessCoordinatorLeftTest.java | 2 +-
.../apache/ignite/testframework/GridTestUtils.java | 4 +-
.../query/h2/GridIndexRebuildSelfTest.java | 4 +-
.../processors/query/h2/QueryDataPageScanTest.java | 4 +-
78 files changed, 314 insertions(+), 254 deletions(-)
diff --git
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
index 6206f44ba6d..9ed302c6bc7 100644
---
a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
+++
b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexRebuildStatusTest.java
@@ -27,7 +27,6 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
import
org.apache.ignite.internal.management.cache.IndexRebuildStatusInfoContainer;
@@ -37,7 +36,6 @@ import
org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import
org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
import
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
@@ -247,7 +245,7 @@ public class GridCommandHandlerIndexRebuildStatusTest
extends GridCommandHandler
SchemaIndexCacheVisitorClosure clo, IndexRebuildCancelToken
cancel) {
idxRebuildsStartedNum.incrementAndGet();
- fut.listen((CI1<IgniteInternalFuture<?>>)f ->
idxRebuildsStartedNum.decrementAndGet());
+ fut.listen(() -> idxRebuildsStartedNum.decrementAndGet());
super.startRebuild(cctx, fut, new
BlockingSchemaIndexCacheVisitorClosure(clo), cancel);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 8b8a89d3a05..ab3c8f11837 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -275,8 +275,8 @@ class GridEventConsumeHandler implements
GridContinuousHandler {
if (F.isEmpty(types))
types = EVTS_ALL;
- p2pUnmarshalFut.listen((fut) -> {
- if (fut.error() == null) {
+ p2pUnmarshalFut.listen(() -> {
+ if (p2pUnmarshalFut.error() == null) {
try {
initFilter(filter, ctx);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index b5df29443cc..0ea7bcbe58f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -129,8 +129,8 @@ public class GridMessageListenHandler implements
GridContinuousHandler {
/** {@inheritDoc} */
@Override public RegisterStatus register(UUID nodeId, UUID routineId,
final GridKernalContext ctx) {
- p2pUnmarshalFut.listen((fut) -> {
- if (fut.error() == null)
+ p2pUnmarshalFut.listen(() -> {
+ if (p2pUnmarshalFut.error() == null)
ctx.io().addUserMessageListener(topic, pred, nodeId);
});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index 3fe750756d2..8f1cc9a5f25 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Async;
/**
@@ -106,11 +108,19 @@ public interface IgniteInternalFuture<R> {
/**
* Registers listener closure to be asynchronously notified whenever
future completes.
*
- * @param lsnr Listener closure to register. If not provided - this method
is no-op.
+ * @param lsnr Listener closure to register.
*/
@Async.Schedule
public void listen(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr);
+ /**
+ * Registers listener closure to be asynchronously notified whenever
future completes.
+ *
+ * @param lsnr Listener closure to register.
+ */
+ @Async.Schedule
+ public void listen(IgniteRunnable lsnr);
+
/**
* Make a chained future to convert result of this future (when complete)
into a new format.
* It is guaranteed that done callback will be called only ONCE.
@@ -121,6 +131,16 @@ public interface IgniteInternalFuture<R> {
@Async.Schedule
public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super
IgniteInternalFuture<R>, T> doneCb);
+ /**
+ * Make a chained future to convert result of this future (when complete)
into a new format.
+ * It is guaranteed that done callback will be called only ONCE.
+ *
+ * @param doneCb Done callback that is applied to this future when it
finishes to produce chained future result.
+ * @return Chained future that finishes after this future completes and
done callback is called.
+ */
+ @Async.Schedule
+ public <T> IgniteInternalFuture<T> chain(IgniteOutClosure<T> doneCb);
+
/**
* Make a chained future to convert result of this future (when complete)
into a new format.
* It is guaranteed that done callback will be called only ONCE.
@@ -132,6 +152,17 @@ public interface IgniteInternalFuture<R> {
@Async.Schedule
public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super
IgniteInternalFuture<R>, T> doneCb, Executor exec);
+ /**
+ * Make a chained future to convert result of this future (when complete)
into a new format.
+ * It is guaranteed that done callback will be called only ONCE.
+ *
+ * @param doneCb Done callback that is applied to this future when it
finishes to produce chained future result.
+ * @param exec Executor to run callback.
+ * @return Chained future that finishes after this future completes and
done callback is called.
+ */
+ @Async.Schedule
+ public <T> IgniteInternalFuture<T> chain(IgniteOutClosure<T> doneCb,
Executor exec);
+
/**
* Make a chained future that is completed when {@code doneCb} is
executed. Callback is called with this future
* as the argument, when this future completes. It is guaranteed that done
callback will be called only ONCE.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index e5056277e53..63ecf97a269 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -439,9 +439,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ClientOperation op = pendingReq.operation;
long startTimeNanos = pendingReq.startTimeNanos;
- pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(()
-> {
+ pendingReq.listen(() -> asyncContinuationExecutor.execute(() -> {
try {
- ByteBuffer payload = payloadFut.get();
+ ByteBuffer payload = pendingReq.get();
T res = null;
if (payload != null && payloadReader != null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
index fe2e894acde..ba65707439f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.client.thin.io.gridnioserver;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.client.thin.io.ClientConnection;
import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
@@ -76,7 +75,7 @@ class GridNioClientConnection implements ClientConnection {
/** {@inheritDoc} */
@Override public void send(ByteBuffer msg, @Nullable Runnable onDone)
throws IgniteCheckedException {
if (onDone != null)
- ses.send(msg).listen(f -> onDone.run());
+ ses.send(msg).listen(onDone::run);
else
ses.sendNoFuture(msg, null);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
index f3fe31bf3f8..1041aed386e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/DistributedConfigurationUtils.java
@@ -21,10 +21,8 @@ import java.io.Serializable;
import java.util.Objects;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributePropertyListener;
import
org.apache.ignite.internal.processors.configuration.distributed.DistributedProperty;
-import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.NotNull;
import static java.lang.String.format;
@@ -49,7 +47,7 @@ public final class DistributedConfigurationUtils {
if (property.get() == null) {
try {
property.propagateAsync(null, value)
- .listen((IgniteInClosure<IgniteInternalFuture<?>>)future
-> {
+ .listen(future -> {
if (future.error() != null)
log.error("Cannot set default value of '" +
property.getName() + '\'', future.error());
});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyJob.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyJob.java
index 11c65f5f520..da8d665371b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyJob.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/cache/IdleVerifyJob.java
@@ -26,9 +26,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorJob;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoggerResource;
+
import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
/**
@@ -75,7 +75,7 @@ class IdleVerifyJob<ResultT> extends
VisorJob<CacheIdleVerifyCommandArg, ResultT
if (!fut.isDone()) {
jobCtx.holdcc();
-
fut.listen((IgniteInClosure<IgniteInternalFuture<ResultT>>)f ->
jobCtx.callcc());
+ fut.listen(() -> jobCtx.callcc());
return null;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/management/meta/MetadataRemoveTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/management/meta/MetadataRemoveTask.java
index e27e701d33b..27000f51b41 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/management/meta/MetadataRemoveTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/management/meta/MetadataRemoveTask.java
@@ -31,12 +31,12 @@ import
org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.VisorJob;
import org.apache.ignite.internal.visor.VisorMultiNodeTask;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.security.SecurityPermissionSet;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.jetbrains.annotations.Nullable;
+
import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
import static
org.apache.ignite.internal.management.meta.MetadataInfoTask.typeId;
import static
org.apache.ignite.internal.processors.task.TaskExecutionOptions.options;
@@ -125,8 +125,8 @@ public class MetadataRemoveTask extends
VisorMultiNodeTask<MetaRemoveCommandArg,
jobCtx.holdcc();
- future.listen((IgniteInClosure<IgniteInternalFuture<?>>)f
-> {
- if (f.isDone())
+ future.listen(() -> {
+ if (future.isDone())
jobCtx.callcc();
});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
index f1f5f11c553..9b9c30ccb08 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/GridEncryptionManager.java
@@ -1252,10 +1252,10 @@ public class GridEncryptionManager extends
GridManagerAdapter<EncryptionSpi> imp
for (int grpId : grpIds) {
IgniteInternalFuture<?> fut = pageScanner.schedule(grpId);
- fut.listen(f -> {
- if (f.isCancelled() || f.error() != null) {
+ fut.listen(() -> {
+ if (fut.isCancelled() || fut.error() != null) {
log.warning("Reencryption " +
- (f.isCancelled() ? "cancelled" : "failed") + " [grp="
+ grpId + "]", f.error());
+ (fut.isCancelled() ? "cancelled" : "failed") + "
[grp=" + grpId + "]", fut.error());
return;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
index f317d0bbfec..847fcce196b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/indexing/IndexesRebuildTask.java
@@ -116,8 +116,8 @@ public class IndexesRebuildTask {
cctx.kernalContext().query().onStartRebuildIndexes(cctx, recreate);
- rebuildCacheIdxFut.listen(fut -> {
- Throwable err = fut.error();
+ rebuildCacheIdxFut.listen(() -> {
+ Throwable err = rebuildCacheIdxFut.error();
if (err == null) {
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
index 23022eebf80..1d3f458a064 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
@@ -183,7 +183,7 @@ public class IgniteAuthenticationProcessor extends
GridProcessorAdapter implemen
discoMgr.setCustomEventListener(UserAcceptedMessage.class, new
UserAcceptedListener());
- discoMgr.localJoinFuture().listen(fut -> onLocalJoin());
+ discoMgr.localJoinFuture().listen(this::onLocalJoin);
discoLsnr = (evt, discoCache) -> {
if (ctx.isStopping())
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index f24ed990762..5aab76e7912 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1351,7 +1351,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdateGetAllTimeStatClosure<>(metrics0(), start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL,
start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_GET_ALL,
start));
return fut;
}
@@ -1461,7 +1461,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdateGetTimeStatClosure<V>(metrics0(), start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_GET, start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_GET, start));
return fut;
}
@@ -1518,7 +1518,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new
UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_GET, start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_GET, start));
return fr;
}
@@ -1624,7 +1624,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdateGetAllTimeStatClosure<Map<K, V>>(metrics0(),
start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL,
start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_GET_ALL,
start));
return fut;
}
@@ -1679,7 +1679,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdateGetAllTimeStatClosure<Map<K,
EntryGetResult>>(metrics0(), start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL,
start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_GET_ALL,
start));
return rf;
}
@@ -2425,7 +2425,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdatePutAndGetTimeStatClosure<V>(metrics0(),
start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_GET_AND_PUT,
start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_GET_AND_PUT,
start));
return fut;
}
@@ -2441,8 +2441,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
@Nullable final CacheEntryPredicate filter) {
return asyncOp(new AsyncOp<V>() {
@Override public IgniteInternalFuture<V> op(GridNearTxLocal tx,
AffinityTopologyVersion readyTopVer) {
- return tx.putAsync(ctx, readyTopVer, key, val, true, filter)
-
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
+ return tx.putAsync(ctx, readyTopVer, key, val, true,
filter).chain(RET2VAL);
}
@Override public String toString() {
@@ -2879,7 +2878,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(),
start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_PUT, start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_PUT, start));
return fut;
}
@@ -2899,8 +2898,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
key,
val,
false,
- filter).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
Boolean>)RET2FLAG);
+ filter).chain(RET2FLAG);
}
@Override public String toString() {
@@ -3024,7 +3022,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdatePutAllTimeStatClosure<>(metrics0(), start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_PUT_ALL,
start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_PUT_ALL,
start));
return fut;
}
@@ -3121,7 +3119,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdateGetAndRemoveTimeStatClosure<V>(metrics0(),
start));
if (performanceStatsEnabled)
- fut.listen(f ->
writeStatistics(OperationType.CACHE_GET_AND_REMOVE, start));
+ fut.listen(() ->
writeStatistics(OperationType.CACHE_GET_AND_REMOVE, start));
return fut;
}
@@ -3139,7 +3137,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
Collections.singletonList(key),
/*retval*/true,
null,
-
/*singleRmv*/false).chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>)RET2VAL);
+ /*singleRmv*/false).chain(RET2VAL);
}
@Override public String toString() {
@@ -3232,7 +3230,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdateRemoveAllTimeStatClosure<>(metrics0(),
start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_REMOVE_ALL,
start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_REMOVE_ALL,
start));
return fut;
}
@@ -3343,7 +3341,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
fut.listen(new UpdateRemoveTimeStatClosure(metrics0(), start));
if (performanceStatsEnabled)
- fut.listen(f -> writeStatistics(OperationType.CACHE_REMOVE,
start));
+ fut.listen(() -> writeStatistics(OperationType.CACHE_REMOVE,
start));
return fut;
}
@@ -3361,8 +3359,7 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
Collections.singletonList(key),
/*retval*/false,
filter,
- /*singleRmv*/true).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
Boolean>)RET2FLAG);
+ /*singleRmv*/true).chain(RET2FLAG);
}
@Override public String toString() {
@@ -4471,9 +4468,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
}
}
-
fut0.listen((IgniteInClosure<IgniteInternalFuture>)fut01 -> {
+ fut0.listen(() -> {
try {
- resFut.onDone(fut01.get());
+ resFut.onDone(fut0.get());
}
catch (Throwable ex) {
resFut.onDone(ex);
@@ -4922,9 +4919,9 @@ public abstract class GridCacheAdapter<K, V> implements
IgniteInternalCache<K, V
GridFutureAdapter<R> fut = new GridFutureAdapter<>();
- orig.listen((f) -> {
+ orig.listen(() -> {
try {
- fut.onDone(f.get());
+ fut.onDone(orig.get());
}
catch (IgniteConsistencyViolationException e1) {
repair.apply(e1).listen((repFut) -> {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 6a35091d652..8bbde4d0623 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -352,7 +352,7 @@ public class GridCacheMvccManager extends
GridCacheSharedManagerAdapter {
*/
private IgniteInternalFuture ignoreErrors(IgniteInternalFuture<?> f) {
GridFutureAdapter<?> wrapper = new GridFutureAdapter();
- f.listen(future -> wrapper.onDone());
+ f.listen(() -> wrapper.onDone());
return wrapper;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index e75a1de09ee..a1a7de9aea2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -139,7 +139,7 @@ public class GridCacheTxRecoveryFuture extends
GridCacheCompoundIdentityFuture<B
if (cctx.localNodeId().equals(nearNodeId)) {
IgniteInternalFuture<Boolean> fut =
cctx.tm().txCommitted(tx.nearXidVersion());
- fut.listen((IgniteInternalFuture<Boolean> fut0) -> {
+ fut.listen(() -> {
try {
onDone(fut.get());
}
@@ -217,7 +217,7 @@ public class GridCacheTxRecoveryFuture extends
GridCacheCompoundIdentityFuture<B
}
}
else {
- fut.listen((IgniteInternalFuture<Boolean> fut0) -> {
+ fut.listen(() -> {
boolean prepared;
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 4a5abb62b25..f33199bdc26 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -484,12 +484,12 @@ public abstract class GridDistributedCacheAdapter<K, V>
extends GridCacheAdapter
IgniteInternalFuture<Boolean> lastFut =
ctx.lastRemoveAllJobFut().get();
if (lastFut != locFut) {
-
lastFut.listen((IgniteInClosure<IgniteInternalFuture<Boolean>>)fut -> {
+ lastFut.listen(() -> {
if (lastFut.error() != null)
locFut.onDone(lastFut.error());
else {
try {
- completeWithResult(fut.get());
+ completeWithResult(lastFut.get());
}
catch (IgniteCheckedException ignored)
{
// Should be never thrown.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 9039b173e4a..646e3245b84 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -481,7 +481,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V>
cctx.shared().exchange()
.affinityReadyFuture(awaitTopVer)
- .listen((f) -> {
+ .listen(f -> {
try {
// Remap.
map(keys.keySet(), F.t(node, keys), f.get());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 2859dc3e849..95a32f2a314 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -276,7 +276,7 @@ public final class GridDhtLockFuture extends
GridCacheCompoundIdentityFuture<Boo
assert fut instanceof GridDhtColocatedLockFuture : fut;
// Terminate this future if parent(collocated) future
is terminated by rollback.
- fut.listen((IgniteInternalFuture<?> fut0) -> {
+ fut.listen(() -> {
try {
fut.get();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index a4ace267401..49f009119c8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -426,7 +426,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter
implements GridCacheMa
*/
final IgniteInternalFuture finalPrepFut = prepFut;
- lockFut.listen((IgniteInternalFuture<?> ignored) ->
finishTx(false, finalPrepFut, fut));
+ lockFut.listen(() -> finishTx(false, finalPrepFut, fut));
return;
}
@@ -497,7 +497,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter
implements GridCacheMa
if (prep.isDone())
finishTx(true, prep, fut);
else
- prep.listen((IgniteInternalFuture<?> f) -> finishTx(true, f,
fut));
+ prep.listen(() -> finishTx(true, prep, fut));
}
else {
assert optimistic();
@@ -540,12 +540,12 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter
implements GridCacheMa
cctx.mvcc().addFuture(fut, fut.futureId());
- GridDhtTxPrepareFuture prepFut = this.prepFut;
+ GridDhtTxPrepareFuture prep = prepFut;
- if (prepFut != null) {
- prepFut.complete();
+ if (prep != null) {
+ prep.complete();
- prepFut.listen((IgniteInternalFuture<?> f) -> finishTx(false, f,
fut));
+ prep.listen(() -> finishTx(false, prep, fut));
}
else
finishTx(false, null, fut);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index acc22d8b865..0105abc103b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -192,7 +192,7 @@ public class GridPartitionedGetFuture<K, V> extends
CacheDistributedGetFutureAda
if (fut.initialVersion().after(topVer) || (fut.exchangeActions()
!= null && fut.exchangeActions().hasStop()))
fut = cctx.shared().exchange().lastFinishedFuture();
else {
- fut.listen((IgniteInternalFuture<AffinityTopologyVersion>
fut0) -> {
+ fut.listen(fut0 -> {
if (fut0.error() != null)
onDone(fut0.error());
else
@@ -287,9 +287,9 @@ public class GridPartitionedGetFuture<K, V> extends
CacheDistributedGetFutureAda
}
// Add new future.
- add(fut0.chain(f -> {
+ add(fut0.chain(() -> {
try {
- return createResultMap(f.get());
+ return createResultMap(fut0.get());
}
catch (Exception e) {
U.error(log, "Failed to get values from dht cache
[fut=" + fut0 + "]", e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 148b0f9bbdb..074ee4fe51c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -315,9 +315,9 @@ public class GridPartitionedSingleGetFuture extends
GridCacheFutureAdapter<Objec
map(updTopVer);
}
else {
- fut0.listen(f -> {
+ fut0.listen(() -> {
try {
- GridCacheEntryInfo info = f.get();
+ GridCacheEntryInfo info = fut0.get();
setResult(info);
}
@@ -893,9 +893,9 @@ public class GridPartitionedSingleGetFuture extends
GridCacheFutureAdapter<Objec
IgniteInternalFuture<AffinityTopologyVersion>
awaitTopologyVersionFuture =
cctx.shared().exchange().affinityReadyFuture(topVer);
- awaitTopologyVersionFuture.listen(f -> {
+ awaitTopologyVersionFuture.listen(() -> {
try {
- remap(f.get());
+ remap(awaitTopologyVersionFuture.get());
}
catch (IgniteCheckedException e) {
onDone(e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index cd9f3b9714c..33d348105e3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1519,7 +1519,7 @@ public final class GridDhtColocatedLockFuture extends
GridCacheCompoundIdentityF
IgniteInternalFuture<TxDeadlock> fut =
cctx.tm().detectDeadlock(tx, keys);
- fut.listen((IgniteInternalFuture<TxDeadlock> fut0) -> {
+ fut.listen(() -> {
try {
TxDeadlock deadlock = fut.get();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 0ecf17eeee8..4b807222185 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -398,7 +398,7 @@ public class GridDhtPartitionDemander {
final RebalanceFuture fut = new RebalanceFuture(grp,
lastExchangeFut, assignments, log, rebalanceId, next, lastCancelledTime);
if (oldFut.isInitial())
- fut.listen(f -> oldFut.onDone(f.result()));
+ fut.listen(() -> oldFut.onDone(fut.result()));
if (forcedRebFut != null)
forcedRebFut.add(fut);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 85957f22af9..eaf90a36127 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -129,6 +129,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
+
import static java.util.Collections.emptySet;
import static java.util.stream.Stream.concat;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT;
@@ -2529,7 +2530,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
// Should execute this listener first, before any external listeners.
// Listeners use stack as data structure.
- listen(f -> {
+ listen(() -> {
// Update last finished future in the first.
cctx.exchange().lastFinishedFuture(this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index c70471a8214..872e4130707 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -595,7 +595,7 @@ public class ExchangeLatchManager {
permits = new AtomicInteger(participants.size());
// Send final acks when latch is completed.
- complete.listen(f -> {
+ complete.listen(() -> {
for (ClusterNode node : participants)
sendAck(node.id(), latchId(), true);
});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
index c7110a351cf..6a718988188 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
@@ -300,7 +300,7 @@ public class PartitionsEvictManager extends
GridCacheSharedManagerAdapter {
GridFutureAdapter<?> fut = task.finishFut;
- fut.listen(f -> {
+ fut.listen(() -> {
synchronized (this) {
taskInProgress--;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index e363f695c81..9915f08070a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -269,9 +269,9 @@ public final class GridNearGetFuture<K, V> extends
CacheDistributedGetFutureAdap
}
// Add new future.
- add(fut.chain(f -> {
+ add(fut.chain(() -> {
try {
- return loadEntries(n.id(), mappedKeys.keySet(),
f.get(), saved, topVer);
+ return loadEntries(n.id(), mappedKeys.keySet(),
fut.get(), saved, topVer);
}
catch (Exception e) {
U.error(log, "Failed to get values from dht cache
[fut=" + fut + "]", e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index a75c062427d..2483e42d9f1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -910,7 +910,7 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
markInitialized();
}
else {
- fut.listen((IgniteInternalFuture<AffinityTopologyVersion>
fut0) -> {
+ fut.listen(() -> {
try {
fut.get();
@@ -1466,7 +1466,7 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
IgniteInternalFuture<TxDeadlock> fut =
cctx.tm().detectDeadlock(tx, keys);
- fut.listen((IgniteInternalFuture<TxDeadlock> fut0) -> {
+ fut.listen(() -> {
try {
TxDeadlock deadlock = fut.get();
@@ -1635,9 +1635,9 @@ public final class GridNearLockFuture extends
GridCacheCompoundIdentityFuture<Bo
if (!affFut.isDone()) {
// TODO FIXME
https://ggsystems.atlassian.net/browse/GG-23288
- affFut.listen((IgniteInternalFuture<?> fut) -> {
+ affFut.listen(() -> {
try {
- fut.get();
+ affFut.get();
remap();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
index fa440d0b65e..e2e9bddfe11 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java
@@ -157,9 +157,9 @@ public abstract class GridNearTxAbstractEnlistFuture<T>
extends GridCacheCompoun
// Terminate this future if parent future is terminated by
rollback.
if (!fut.isDone()) {
- fut.listen((IgniteInternalFuture fut0) -> {
- if (fut0.error() != null)
- onDone(fut0.error());
+ fut.listen(() -> {
+ if (fut.error() != null)
+ onDone(fut.error());
});
}
else if (fut.error() != null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index 4cd99d6ac35..4df7c947b05 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -31,7 +31,6 @@ import
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
@@ -500,7 +499,7 @@ public class GridNearTxEnlistFuture extends
GridNearTxAbstractEnlistFuture<GridC
updateLocalFuture(fut);
- fut.listen((IgniteInternalFuture<GridCacheReturn> fut0) -> {
+ fut.listen(() -> {
try {
clearLocalFuture(fut);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 79340a08b07..cd28b6614bb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -431,9 +431,9 @@ public final class GridNearTxFinishFuture<K, V> extends
GridCacheCompoundIdentit
}
}
- curFut.listen((IgniteInternalFuture<?> fut) -> {
+ curFut.listen(() -> {
try {
- fut.get();
+ curFut.get();
rollbackAsyncSafe(onTimeout);
}
@@ -599,7 +599,7 @@ public final class GridNearTxFinishFuture<K, V> extends
GridCacheCompoundIdentit
IgniteInternalFuture<?> fut =
cctx.tm().remoteTxFinishFuture(nearXidVer);
- fut.listen((IgniteInternalFuture<?> fut0)
-> mini.onDone(tx));
+ fut.listen(() -> mini.onDone(tx));
return;
}
@@ -1007,7 +1007,7 @@ public final class GridNearTxFinishFuture<K, V> extends
GridCacheCompoundIdentit
if (backup.isLocal()) {
IgniteInternalFuture<?> fut =
cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
- fut.listen((IgniteInternalFuture<?>
fut0) -> mini.onDhtFinishResponse(cctx.localNodeId()));
+ fut.listen(() ->
mini.onDhtFinishResponse(cctx.localNodeId()));
}
else {
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index ca5b56bd7bb..7e218c68581 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1102,9 +1102,9 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
recovery,
expiryPlc);
- loadFut.listen((IgniteInternalFuture<Void> fut) -> {
+ loadFut.listen(() -> {
try {
- fut.get();
+ loadFut.get();
finishFuture(enlistFut, null, true);
}
@@ -1298,9 +1298,9 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
recovery,
expiryPlc);
- loadFut.listen((IgniteInternalFuture<Void> fut) -> {
+ loadFut.listen(() -> {
try {
- fut.get();
+ loadFut.get();
finishFuture(enlistFut, null, true);
}
@@ -3109,7 +3109,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
expiryPlc0,
skipVals,
needVer)
- .chain((IgniteInternalFuture<Map<Object, Object>> f) -> {
+ .chain(f -> {
try {
Map<Object, Object> map = f.get();
@@ -3142,7 +3142,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
true,
this)
.multi()
- .chain((fut) -> {
+ .chain(fut -> {
try {
Map<Object, Object> map = fut.get();
@@ -3181,7 +3181,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
recovery,
null,
label()
- ).chain((IgniteInternalFuture<Object> f) -> {
+ ).chain(f -> {
try {
Object val = f.get();
@@ -3211,7 +3211,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
/*keepCacheObject*/true,
label(),
null
- ).chain((IgniteInternalFuture<Map<Object, Object>> f) -> {
+ ).chain(f -> {
try {
Map<Object, Object> map = f.get();
@@ -3871,7 +3871,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
return prepFut;
if (trackTimeout)
- prepFut.listen((IgniteInternalFuture<?> f) ->
removeTimeoutHandler());
+ prepFut.listen(this::removeTimeoutHandler);
if (timeout == -1) {
fut.onDone(this, timeoutException());
@@ -3947,7 +3947,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
((GridFutureAdapter<IgniteInternalTx>)prepFut).onDone(t);
}
- prepareFut.listen((IgniteInternalFuture<?> f) -> {
+ prepareFut.listen(f -> {
// These values should not be changed after set once.
prepareTime.compareAndSet(0, System.nanoTime() -
prepareStartTime.get());
@@ -4113,7 +4113,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
if (!commit) {
final GridNearTxFinishFuture rollbackFut = new
GridNearTxFinishFuture<>(cctx, this, false);
- fut.listen((IgniteInternalFuture<IgniteInternalTx> fut0) -> {
+ fut.listen(() -> {
if (FINISH_FUT_UPD.compareAndSet(tx, fut, rollbackFut)) {
switch (tx.state()) {
case COMMITTED:
@@ -4134,9 +4134,9 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
}
}
else {
-
finishFut.listen((IgniteInternalFuture<IgniteInternalTx> f) -> {
+ finishFut.listen(() -> {
try {
- f.get();
+ finishFut.get();
rollbackFut.markInitialized();
}
@@ -4152,7 +4152,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
else {
final GridFutureAdapter<IgniteInternalTx> fut0 = new
GridFutureAdapter<>();
- fut.listen((IgniteInternalFuture<IgniteInternalTx> f) -> {
+ fut.listen(() -> {
if (timedOut())
fut0.onDone(new
IgniteTxTimeoutCheckedException("Failed to commit transaction, " +
"transaction is concurrently rolled back on
timeout: " + tx));
@@ -4249,7 +4249,7 @@ public class GridNearTxLocal extends
GridDhtTxLocalAdapter implements GridTimeou
if (fut.isDone())
cctx.tm().mvccFinish(this);
else
- fut.listen(f -> cctx.tm().mvccFinish(this));
+ fut.listen(() -> cctx.tm().mvccFinish(this));
return fut;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
index f9c021db5b7..b541ae1121b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryEnlistFuture.java
@@ -167,7 +167,7 @@ public class GridNearTxQueryEnlistFuture extends
GridNearTxQueryAbstractEnlistFu
updateLocalFuture(locFut);
- locFut.listen((IgniteInternalFuture<Long> fut) -> {
+ locFut.listen(fut -> {
assert fut.error() != null || fut.result() != null :
fut;
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index 6a8b722d1cf..9936211fa8b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -460,7 +459,7 @@ public class GridNearTxQueryResultsEnlistFuture extends
GridNearTxQueryAbstractE
updateLocalFuture(fut);
- fut.listen((IgniteInternalFuture<Long> fut0) -> {
+ fut.listen(() -> {
assert fut.error() != null || fut.result() != null : fut;
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
index 1944bff85ff..9d9090140c3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairAbstractFuture.java
@@ -249,10 +249,10 @@ public abstract class GridNearReadRepairAbstractFuture
extends GridFutureAdapter
if (REMAP_CALLS_UPD.compareAndSet(this, 0, 1)) {
GridNearReadRepairAbstractFuture fut = remapFuture(topVer);
- fut.listen(f -> {
+ fut.listen(() -> {
assert !isDone();
- onDone(f.result(), f.error());
+ onDone(fut.result(), fut.error());
});
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
index 80afb606533..547da85ca0e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairCheckOnlyFuture.java
@@ -236,7 +236,7 @@ public class GridNearReadRepairCheckOnlyFuture extends
GridNearReadRepairAbstrac
* @return Future represents 1 entry's value.
*/
public <K, V> IgniteInternalFuture<V> single() {
- return init().chain((fut) -> {
+ return init().chain(fut -> {
try {
final Map<K, V> map = new IgniteBiTuple<>();
@@ -262,7 +262,7 @@ public class GridNearReadRepairCheckOnlyFuture extends
GridNearReadRepairAbstrac
* @return Future represents entries map.
*/
public <K, V> IgniteInternalFuture<Map<K, V>> multi() {
- return init().chain((fut) -> {
+ return init().chain(fut -> {
try {
final Map<K, V> map = U.newHashMap(keys.size());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
index 26182ee40aa..0231f89b529 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/consistency/GridNearReadRepairFuture.java
@@ -136,7 +136,7 @@ public class GridNearReadRepairFuture extends
GridNearReadRepairAbstractFuture {
assert !correctedMap.isEmpty(); // Check failed on the same
data.
- tx.finishFuture().listen(future -> {
+ tx.finishFuture().listen(() -> {
TransactionState state = tx.state();
if (state == TransactionState.COMMITTED) // Explicit tx
may fix the values but become rolled back later.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index cfc98852b76..fd81372fa47 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -742,7 +742,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter
implements MvccProce
=
ctx.cache().context().deadlockDetectionMgr().initDelayedComputation(waiterVer,
blockerVer);
if (delayedComputation != null)
- fut.listen(fut0 -> delayedComputation.cancel());
+ fut.listen(delayedComputation::cancel);
}
return fut;
@@ -1330,11 +1330,11 @@ public class MvccProcessorImpl extends
GridProcessorAdapter implements MvccProce
res0.markInitialized();
- res0.listen(future -> {
+ res0.listen(() -> {
VacuumMetrics metrics = null; Throwable ex
= null;
try {
- metrics = future.get();
+ metrics = res0.get();
txLog.removeUntil(snapshot.coordinatorVersion(), snapshot.cleanupVersion());
@@ -2216,7 +2216,7 @@ public class MvccProcessorImpl extends
GridProcessorAdapter implements MvccProce
break;
case MOVING:
-
task.part().group().preloader().rebalanceFuture().listen(f ->
cleanupQueue.add(task));
+
task.part().group().preloader().rebalanceFuture().listen(() ->
cleanupQueue.add(task));
break;
case OWNING:
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
index 89a23005f59..aedae40a69c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTrackerImpl.java
@@ -268,7 +268,7 @@ public class MvccQueryTrackerImpl implements
MvccQueryTracker {
if (readyFut.isDone())
onAffinityReady(readyFut, lsnr);
else
- readyFut.listen(fut -> onAffinityReady(fut, lsnr));
+ readyFut.listen(() -> onAffinityReady(readyFut, lsnr));
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index aeee241b98f..d6f8246daae 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1488,7 +1488,7 @@ public class GridCacheDatabaseSharedManager extends
IgniteCacheDatabaseSharedMan
);
if (rebuildFut != null)
- rebuildFut.listen(fut ->
rebuildIndexesCompleteCntr.countDown(true));
+ rebuildFut.listen(() ->
rebuildIndexesCompleteCntr.countDown(true));
else
rebuildIndexesCompleteCntr.countDown(false);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java
index 8c940141eca..ea9d933b700 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointContextImpl.java
@@ -103,7 +103,7 @@ public class CheckpointContextImpl implements
CheckpointListener.Context {
try {
GridFutureAdapter<?> res = new GridFutureAdapter<>();
- res.listen(fut -> heartbeatUpdater.updateHeartbeat());
+ res.listen(heartbeatUpdater::updateHeartbeat);
asyncRunner.execute(U.wrapIgniteFuture(cmd, res));
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
index f45c48bcb02..196954b5558 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointProgressImpl.java
@@ -22,12 +22,10 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.NotNull;
import static
org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
@@ -248,9 +246,9 @@ public class CheckpointProgressImpl implements
CheckpointProgress {
/** {@inheritDoc} */
@Override public void onStateChanged(CheckpointState state, Runnable clo) {
- GridFutureAdapter<?> fut0 = futureFor(state);
+ GridFutureAdapter<?> fut = futureFor(state);
- fut0.listen((IgniteInClosure<IgniteInternalFuture>)fut -> {
+ fut.listen(() -> {
if (fut.error() == null)
clo.run();
});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
index 00c680d3bcb..556c990ded2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java
@@ -210,12 +210,12 @@ public class CachePartitionDefragmentationManager {
new LinkedBlockingQueue<>()
);
- completionFut.chain(future -> {
+ completionFut.chain(() -> {
linkMapByPart.values().forEach(LinkMap::close);
linkMapByPart.clear();
- return future.result();
+ return completionFut.result();
});
}
@@ -421,7 +421,7 @@ public class CachePartitionDefragmentationManager {
PageStore oldIdxPageStore =
filePageStoreMgr.getStore(grpId, INDEX_PARTITION);
- idxDfrgFut = idxDfrgFut.chain(fut -> {
+ idxDfrgFut = idxDfrgFut.chain(() -> {
if (log.isDebugEnabled()) {
log.debug(S.toString(
"Index partition defragmented",
@@ -628,7 +628,7 @@ public class CachePartitionDefragmentationManager {
/** */
public IgniteInternalFuture<?> completionFuture() {
- return completionFut.chain(future -> null);
+ return completionFut.chain(() -> null);
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e9636db1500..157a1ea5c57 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -1158,9 +1158,9 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
}
- return task0.chain(fut -> {
- if (fut.error() != null)
- throw F.wrap(fut.error());
+ return task0.chain(() -> {
+ if (task0.error() != null)
+ throw F.wrap(task0.error());
try {
Set<String> blts = req.nodes().stream()
@@ -1171,7 +1171,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
snpDir.mkdirs();
- SnapshotFutureTaskResult res =
(SnapshotFutureTaskResult)fut.result();
+ SnapshotFutureTaskResult res =
(SnapshotFutureTaskResult)task0.result();
SnapshotMetadata meta = new SnapshotMetadata(req.requestId(),
req.snapshotName(),
@@ -1224,7 +1224,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
activeTxsFut.markInitialized();
- activeTxsFut.listen(f -> wrapMsgsFut.onDone());
+ activeTxsFut.listen(() -> wrapMsgsFut.onDone());
}
if (cctx.kernalContext().clientNode())
@@ -1374,7 +1374,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
if (cctx.kernalContext().clientNode())
return (IgniteInternalFuture<SnapshotOperationResponse>)prepFut;
- return prepFut.chain(r -> {
+ return prepFut.chain(() -> {
try {
if (req.error() != null) {
snpReq.error(req.error());
@@ -1644,7 +1644,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** {@inheritDoc} */
@Override public IgniteFuture<Void> cancelSnapshot(String name) {
- return new IgniteFutureImpl<>(cancelSnapshot0(name).chain(f -> null));
+ return new IgniteFutureImpl<>(cancelSnapshot0(name).chain(() -> null));
}
/**
@@ -1959,7 +1959,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
});
if (log.isInfoEnabled()) {
- res.listen(f -> log.info("The check snapshot procedure finished
[snpName=" + name +
+ res.listen(() -> log.info("The check snapshot procedure finished
[snpName=" + name +
", snpPath=" + snpPath + ", incIdx=" + incIdx + ", grps=" +
grps + ']'));
}
@@ -2236,14 +2236,14 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
List<ClusterNode> srvNodes =
cctx.discovery().serverNodes(AffinityTopologyVersion.NONE);
- snpFut0.listen(f -> {
- if (f.error() == null)
+ snpFut0.listen(() -> {
+ if (snpFut0.error() == null)
recordSnapshotEvent(name, SNAPSHOT_FINISHED_MSG + grps,
EVT_CLUSTER_SNAPSHOT_FINISHED);
else {
- String errMsgPref = f.error() instanceof
SnapshotWarningException ? SNAPSHOT_FINISHED_WRN_MSG
+ String errMsgPref = snpFut0.error() instanceof
SnapshotWarningException ? SNAPSHOT_FINISHED_WRN_MSG
: SNAPSHOT_FAILED_MSG;
- recordSnapshotEvent(name, errMsgPref +
f.error().getMessage(), EVT_CLUSTER_SNAPSHOT_FAILED);
+ recordSnapshotEvent(name, errMsgPref +
snpFut0.error().getMessage(), EVT_CLUSTER_SNAPSHOT_FAILED);
}
});
@@ -2696,7 +2696,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
", topVer=" + cctx.discovery().topologyVersionEx() + ']');
}
- task.listen(f -> locSnpTasks.remove(rqId));
+ task.listen(() -> locSnpTasks.remove(rqId));
return task;
}
@@ -3671,7 +3671,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
RemoteSnapshotFilesRecevier curr = active;
if (curr == null || curr.isDone()) {
- next.listen(f -> scheduleNext());
+ next.listen(this::scheduleNext);
active = next;
@@ -3780,17 +3780,17 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
rmtSndrFactory.apply(rqId, nodeId),
reqMsg0.parts()));
- task.listen(f -> {
- if (f.error() == null)
+ task.listen(() -> {
+ if (task.error() == null)
return;
U.error(log, "Failed to process request of
creating a snapshot " +
- "[from=" + nodeId + ", msg=" + reqMsg0 + ']',
f.error());
+ "[from=" + nodeId + ", msg=" + reqMsg0 + ']',
task.error());
try {
cctx.gridIO().sendToCustomTopic(nodeId,
DFLT_INITIAL_SNAPSHOT_TOPIC,
- new
SnapshotFilesFailureMessage(reqMsg0.id(), f.error().getMessage()),
+ new
SnapshotFilesFailureMessage(reqMsg0.id(), task.error().getMessage()),
SYSTEM_POOL);
}
catch (IgniteCheckedException ex0) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
index e9be26974f7..1ffb31fe9a5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java
@@ -131,9 +131,9 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
return false;
}
- highPtrFut.chain(fut -> {
- if (fut.error() != null) {
- onDone(fut.error());
+ highPtrFut.chain(() -> {
+ if (highPtrFut.error() != null) {
+ onDone(highPtrFut.error());
return null;
}
@@ -141,7 +141,7 @@ class IncrementalSnapshotFutureTask extends
AbstractSnapshotFutureTask<Void> imp
try {
String folderName =
cctx.kernalContext().pdsFolderResolver().resolveFolders().folderName();
- copyWal(incrementalSnapshotWalsDir(incSnpDir, folderName),
fut.result());
+ copyWal(incrementalSnapshotWalsDir(incSnpDir, folderName),
highPtrFut.result());
copyFiles(
MarshallerContextImpl.mappingFileStoreWorkDir(cctx.gridConfig().getWorkDirectory()),
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMarkWalFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMarkWalFuture.java
index f0574f9f89a..9035548730f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMarkWalFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotMarkWalFuture.java
@@ -106,11 +106,11 @@ class IncrementalSnapshotMarkWalFuture extends
GridFutureAdapter<WALPointer> {
checkFut.markInitialized();
- checkFut.listen(finish -> {
+ checkFut.listen(() -> {
if (isDone())
return;
- if (Boolean.FALSE.equals(finish.result())) {
+ if (Boolean.FALSE.equals(checkFut.result())) {
onDone(new IgniteCheckedException("Incremental snapshot is
inconsistent [id=" + id + ']'));
return;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
index 7fc31dbdfb7..793316949f2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java
@@ -293,7 +293,7 @@ class SnapshotFutureTask extends
AbstractSnapshotFutureTask<SnapshotFutureTaskRe
log);
}
- startedFut.listen(f ->
+ startedFut.listen(() ->
((GridCacheDatabaseSharedManager)cctx.database()).removeCheckpointListener(this)
);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
index f16543a4fa4..074a1b7227e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java
@@ -308,11 +308,11 @@ public class SnapshotRestoreProcess {
return new IgniteFinishedFutureImpl<>(e);
}
- fut0.listen(f -> {
- if (f.error() != null) {
+ fut0.listen(() -> {
+ if (fut0.error() != null) {
snpMgr.recordSnapshotEvent(
snpName,
- OP_FAILED_MSG + ": " + f.error().getMessage() + " [reqId="
+ fut0.rqId + "].",
+ OP_FAILED_MSG + ": " + fut0.error().getMessage() + "
[reqId=" + fut0.rqId + "].",
EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FAILED
);
}
@@ -586,7 +586,7 @@ public class SnapshotRestoreProcess {
interrupt(opCtx0, reason);
return fut0 == null ? new IgniteFinishedFutureImpl<>(ctxStop) :
- new IgniteFutureImpl<>(fut0.chain(f -> true));
+ new IgniteFutureImpl<>(fut0.chain(() -> true));
}
/**
@@ -973,7 +973,7 @@ public class SnapshotRestoreProcess {
IgniteSnapshotManager snpMgr = ctx.cache().context().snapshotMgr();
synchronized (this) {
- opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f ->
null));
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(() ->
null));
}
if (log.isInfoEnabled()) {
@@ -1671,7 +1671,7 @@ public class SnapshotRestoreProcess {
GridFutureAdapter<Boolean> retFut = new GridFutureAdapter<>();
synchronized (this) {
- opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(f -> null));
+ opCtx0.stopFut = new IgniteFutureImpl<>(retFut.chain(() -> null));
}
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index deaa70f0f1f..16d71f80152 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -380,9 +380,9 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
initLocalListener(locLsnr, ctx);
if (initFut == null) {
- initFut = p2pUnmarshalFut.chain((fut) -> {
+ initFut = p2pUnmarshalFut.chain(() -> {
try {
- fut.get();
+ p2pUnmarshalFut.get();
initRemoteFilter(getEventFilter0(), ctx);
@@ -705,7 +705,7 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
RegisterStatus regStatus = mgr.registerListener(routineId, lsnr,
internal);
if (regStatus == RegisterStatus.REGISTERED)
- initFut.listen(res -> sendQueryExecutedEvent());
+ initFut.listen(this::sendQueryExecutedEvent);
return regStatus;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b3d93189a9f..e60dc5dcc37 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -335,9 +335,9 @@ public class IgniteTxHandler {
IgniteInternalFuture<GridNearTxPrepareResponse> fut =
locTx.prepareAsyncLocal(req);
- return fut.chain((IgniteInternalFuture<GridNearTxPrepareResponse> f)
-> {
+ return fut.chain(() -> {
try {
- return f.get();
+ return fut.get();
}
catch (Exception e) {
locTx.setRollbackOnly(); // Just in case.
@@ -618,9 +618,9 @@ public class IgniteTxHandler {
final GridDhtTxLocal tx0 = tx;
- fut.listen((IgniteInternalFuture<?> txFut) -> {
+ fut.listen(() -> {
try {
- txFut.get();
+ fut.get();
}
catch (IgniteCheckedException e) {
tx0.setRollbackOnly(); // Just in case.
@@ -1284,7 +1284,7 @@ public class IgniteTxHandler {
final GridDhtTxRemote dhtTx0 = dhtTx;
final GridNearTxRemote nearTx0 = nearTx;
- completeFut.listen((IgniteInternalFuture<IgniteInternalTx>
fut) ->
+ completeFut.listen(() ->
sendReply(nodeId, req, res0, dhtTx0, nearTx0));
}
else
@@ -1360,7 +1360,7 @@ public class IgniteTxHandler {
else {
IgniteInternalFuture<?> fut =
ctx.tm().remoteTxFinishFuture(req.version());
- fut.listen((IgniteInternalFuture<?> f) ->
sendReply(nodeId, req, true, null));
+ fut.listen(() -> sendReply(nodeId, req, true, null));
}
return;
@@ -1402,7 +1402,7 @@ public class IgniteTxHandler {
IgniteInternalFuture<IgniteInternalTx> completeFut =
completeFuture(dhtTx, nearTx);
if (completeFut != null) {
- completeFut.listen((IgniteInternalFuture<IgniteInternalTx>
fut) ->
+ completeFut.listen(() ->
sendReply(nodeId, req, true, nearTxId));
}
else
@@ -2143,7 +2143,7 @@ public class IgniteTxHandler {
sendCheckPreparedResponse(nodeId, req, prepared);
}
else {
- fut.listen((IgniteInternalFuture<Boolean> f) -> {
+ fut.listen(() -> {
boolean prepared;
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 296823e3933..61245b29933 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1710,7 +1710,7 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
setRollbackOnly();
if (commit && commitAfterLock())
- return
rollbackAsync().chain((IgniteInternalFuture<IgniteInternalTx> f) -> {
+ return rollbackAsync().chain(() -> {
throw new GridClosureException(e);
});
@@ -1727,7 +1727,7 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
);
if (commit && commitAfterLock())
- return
rollbackAsync().chain((IgniteInternalFuture<IgniteInternalTx> f) -> {
+ return rollbackAsync().chain(() -> {
throw ex;
});
@@ -1758,7 +1758,7 @@ public abstract class IgniteTxLocalAdapter extends
IgniteTxAdapter implements Ig
}
catch (final IgniteCheckedException ex) {
if (commit && commitAfterLock())
- return
rollbackAsync().chain((IgniteInternalFuture<IgniteInternalTx> f) -> {
+ return rollbackAsync().chain(() -> {
throw new GridClosureException(ex);
});
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 1ee1bb38cc5..29b61efb7eb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -897,7 +897,7 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
final GridCompoundFuture finishAllTxsFut = new
CacheObjectsReleaseFuture("AllTx", topVer);
// After finishing all local updates, wait for finishing all tx
updates on backups.
- finishLocTxsFut.listen(future -> {
+ finishLocTxsFut.listen(() -> {
finishAllTxsFut.add(cctx.mvcc().finishRemoteTxs(topVer));
finishAllTxsFut.markInitialized();
});
@@ -2168,7 +2168,7 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
if (log.isDebugEnabled())
log.debug("Found near transaction, will wait for completion: "
+ tx);
- tx.finishFuture().listen((IgniteInternalFuture<IgniteInternalTx>
fut) -> {
+ tx.finishFuture().listen(() -> {
TransactionState state = tx.state();
if (log.isDebugEnabled())
@@ -2247,7 +2247,7 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
final Collection<GridCacheVersion> processedVers0 =
processedVers;
- prepFut.listen((IgniteInternalFuture<?> ignored) -> {
+ prepFut.listen(() -> {
if (log.isDebugEnabled())
log.debug("Transaction prepare future finished: "
+ tx);
@@ -3249,7 +3249,7 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
IgniteInternalFuture<?> prepFut =
tx.currentPrepareFuture();
if (prepFut != null) {
- prepFut.listen(fut -> {
+ prepFut.listen(() -> {
if (tx.state() == PREPARED)
processPrepared(tx, evtNodeId);
// If we could not mark tx as
rollback, it means that transaction is being committed.
@@ -3276,7 +3276,7 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
doneFut.markInitialized();
if (log.isInfoEnabled() && preparedTxCnt.get() > 0)
- doneFut.listen(fut -> finishAndRecordTimings());
+ doneFut.listen(this::finishAndRecordTimings);
if (allTxFinFut == null)
return;
@@ -3284,7 +3284,7 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
allTxFinFut.markInitialized();
// Send vote to mvcc coordinator when all recovering
transactions have finished.
- allTxFinFut.listen(fut -> {
+ allTxFinFut.listen(() -> {
// If mvcc coordinator issued snapshot for recovering
transaction has failed during recovery,
// then there is no need to send messages to new
coordinator.
try {
@@ -3706,9 +3706,9 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
IgniteInternalFuture<?> recInitFut =
cctx.kernalContext().closure().runLocalSafe(
new TxRecoveryInitRunnable(evt.eventNode(),
cctx.coordinators().currentCoordinator()));
- recInitFut.listen(future -> {
- if (future.error() != null)
- cctx.kernalContext().failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, future.error()));
+ recInitFut.listen(() -> {
+ if (recInitFut.error() != null)
+ cctx.kernalContext().failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, recInitFut.error()));
});
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index da485b02294..8078bbe1f53 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -81,7 +81,6 @@ import
org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -333,7 +332,7 @@ public class GridClusterStateProcessor extends
GridProcessorAdapter implements I
if (fut != null) {
if (asyncWaitForTransition) {
- return new
IgniteFutureImpl<>(fut.chain((C1<IgniteInternalFuture<Void>, ClusterState>)f ->
{
+ return new IgniteFutureImpl<>(fut.chain(() -> {
ClusterState res = globalState.transitionResult();
assert res != null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
index bb973857e8a..2fa811ea071 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
@@ -123,7 +123,7 @@ public class DmsDataWriterWorker extends GridWorker {
updateQueue.offer((RunnableFuture<?>)(suspendFut = new
FutureTask<>(() -> AWAIT)));
- compFut.listen(f -> latch.countDown());
+ compFut.listen(() -> latch.countDown());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 2126f64872e..e26ebddbcf1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -221,8 +221,8 @@ public class ClientListenerNioListener extends
GridNioServerListenerAdapter<Clie
GridNioFuture<?> fut = ses.send(parser.encode(resp));
- fut.listen(f -> {
- if (f.error() == null)
+ fut.listen(() -> {
+ if (fut.error() == null)
resp.onSent();
});
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
index f1525fa3f4a..e1a392e47e8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/compute/ClientComputeTask.java
@@ -123,21 +123,21 @@ class ClientComputeTask implements
ClientCloseableResource {
void onResponseSent() {
// Listener should be registered only after response for this task was
sent, to ensure that client doesn't
// receive notification before response for the task.
- taskFut.listen(f -> {
+ taskFut.listen(() -> {
try {
ClientNotification notification;
- if (f.error() != null) {
+ if (taskFut.error() != null) {
String msg =
ctx.kernalContext().clientListener().sendServerExceptionStackTraceToClient()
- ? f.error().getMessage() + U.nl() +
X.getFullStackTrace(f.error())
- : f.error().getMessage();
+ ? taskFut.error().getMessage() + U.nl() +
X.getFullStackTrace(taskFut.error())
+ : taskFut.error().getMessage();
notification = new
ClientNotification(OP_COMPUTE_TASK_FINISHED, taskId, msg);
}
- else if (f.isCancelled())
+ else if (taskFut.isCancelled())
notification = new
ClientNotification(OP_COMPUTE_TASK_FINISHED, taskId, "Task was cancelled");
else
- notification = new
ClientObjectNotification(OP_COMPUTE_TASK_FINISHED, taskId, f.result());
+ notification = new
ClientObjectNotification(OP_COMPUTE_TASK_FINISHED, taskId, taskFut.result());
ctx.notifyClient(notification);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index b30ba5ca6b0..65ae6ae9774 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -44,6 +44,8 @@ import
org.apache.ignite.internal.processors.task.TaskExecutionOptions;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils.getResult;
@@ -487,16 +489,31 @@ public class PlatformCompute extends
PlatformAbstractTarget {
});
}
+ /** {@inheritDoc} */
+ @Override public void listen(final IgniteRunnable lsnr) {
+ listen(ignored -> lsnr.run());
+ }
+
/** {@inheritDoc} */
@Override public IgniteInternalFuture chain(IgniteClosure doneCb) {
throw new UnsupportedOperationException("Chain operation is not
supported.");
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture chain(IgniteOutClosure doneCb) {
+ throw new UnsupportedOperationException("Chain operation is not
supported.");
+ }
+
/** {@inheritDoc} */
@Override public IgniteInternalFuture chain(IgniteClosure doneCb,
Executor exec) {
throw new UnsupportedOperationException("Chain operation is not
supported.");
}
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture chain(IgniteOutClosure doneCb,
Executor exec) {
+ throw new UnsupportedOperationException("Chain operation is not
supported.");
+ }
+
/** {@inheritDoc} */
@Override public IgniteInternalFuture chainCompose(IgniteClosure
doneCb) {
throw new UnsupportedOperationException("Chain compose operation
is not supported.");
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7d5e3e244dd..b3415101a40 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2634,8 +2634,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
if (log.isInfoEnabled())
log.info("Started indexes rebuilding for cache " + cacheInfo);
- idxFut.listen(fut -> {
- Throwable err = fut.error();
+ idxFut.listen(() -> {
+ Throwable err = idxFut.error();
if (isNull(err) && log.isInfoEnabled())
log.info("Finished indexes rebuilding for cache " +
cacheInfo);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
index 3bad5315ae3..0e3e1f3df25 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
@@ -720,7 +720,7 @@ public class RunningQueryManager {
}
if (!msg.asyncResponse())
- runningQryInfo.runningFuture().listen((f) ->
sendKillResponse(msg, node, f.result()));
+ runningQryInfo.runningFuture().listen(f ->
sendKillResponse(msg, node, f.result()));
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
index 7de4716668d..d00691f0fc4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/SchemaIndexCacheVisitorImpl.java
@@ -33,7 +33,6 @@ import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearCach
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -128,17 +127,14 @@ public class SchemaIndexCacheVisitorImpl implements
SchemaIndexCacheVisitor {
cctx.kernalContext().pools().buildIndexExecutorService().execute(worker);
}
- buildIdxCompoundFut.listen(fut -> {
- Throwable err = fut.error();
+ buildIdxCompoundFut.listen(() -> {
+ Throwable err = buildIdxCompoundFut.error();
if (isNull(err) && collectStat && log.isInfoEnabled()) {
try {
- GridCompoundFuture<SchemaIndexCacheStat,
SchemaIndexCacheStat> compoundFut =
- (GridCompoundFuture<SchemaIndexCacheStat,
SchemaIndexCacheStat>)fut;
-
SchemaIndexCacheStat resStat = new SchemaIndexCacheStat();
- compoundFut.futures().stream()
+ buildIdxCompoundFut.futures().stream()
.map(IgniteInternalFuture::result)
.filter(Objects::nonNull)
.forEach(resStat::accumulate);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
index 74690705f1e..0c804ece39f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/stat/IgniteStatisticsConfigurationManager.java
@@ -383,8 +383,8 @@ public class IgniteStatisticsConfigurationManager {
compoundFuture.markInitialized();
- compoundFuture.listen(future -> {
- if (future.error() == null && !future.result())
+ compoundFuture.listen(() -> {
+ if (compoundFuture.error() == null && !compoundFuture.result())
mgmtBusyExecutor.execute(this::updateAllLocalStatistics);
});
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
index e33ee339fda..37828dd2474 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java
@@ -31,7 +31,6 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
import
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -44,7 +43,6 @@ import
org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.services.ServiceConfiguration;
import org.jetbrains.annotations.NotNull;
@@ -429,7 +427,7 @@ class ServiceDeploymentTask {
protected void onReceiveSingleDeploymentsMessage(UUID snd,
ServiceSingleNodeDeploymentResultBatch msg) {
assert depId.equals(msg.deploymentId()) : "Wrong message's deployment
process id, msg=" + msg;
- initCrdFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+ initCrdFut.listen(() -> {
if (isCompleted())
return;
@@ -454,7 +452,7 @@ class ServiceDeploymentTask {
protected void
onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBatch msg) {
assert depId.equals(msg.deploymentId()) : "Wrong message's deployment
process id, msg=" + msg;
- initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+ initTaskFut.listen(() -> {
if (isCompleted())
return;
@@ -698,7 +696,7 @@ class ServiceDeploymentTask {
* @param nodeId Left node id.
*/
protected void onNodeLeft(UUID nodeId) {
- initTaskFut.listen((IgniteInClosure<IgniteInternalFuture<?>>)fut -> {
+ initTaskFut.listen(() -> {
if (isCompleted())
return;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
index 6d02e66a5a7..fc4fb881aad 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/DistributedProcess.java
@@ -150,11 +150,11 @@ public class DistributedProcess<I extends Serializable, R
extends Serializable>
try {
IgniteInternalFuture<R> fut = exec.apply((I)msg.request());
- fut.listen(f -> {
- if (f.error() != null)
- p.resFut.onDone(f.error());
+ fut.listen(() -> {
+ if (fut.error() != null)
+ p.resFut.onDone(fut.error());
else
- p.resFut.onDone(f.result());
+ p.resFut.onDone(fut.result());
if (!ctx.clientNode() || p.waitClnRes) {
assert crd != null;
@@ -215,7 +215,7 @@ public class DistributedProcess<I extends Serializable, R
extends Serializable>
UUID leftNodeId = evt.eventNode().id();
for (Process p : processes.values()) {
- p.initFut.listen(fut -> {
+ p.initFut.listen(() -> {
if (F.eq(leftNodeId, p.crdId)) {
ClusterNode crd = coordinator();
@@ -231,7 +231,7 @@ public class DistributedProcess<I extends Serializable, R
extends Serializable>
initCoordinator(p, discoCache.version());
if (!ctx.clientNode() || p.waitClnRes)
- p.resFut.listen(f -> sendSingleMessage(p));
+ p.resFut.listen(() -> sendSingleMessage(p));
}
else if (F.eq(ctx.localNodeId(), p.crdId)) {
boolean isEmpty = false;
@@ -328,7 +328,7 @@ public class DistributedProcess<I extends Serializable, R
extends Serializable>
private void onSingleNodeMessageReceived(SingleNodeMessage<R> msg, UUID
nodeId) {
Process p = processes.computeIfAbsent(msg.processId(), id -> new
Process(msg.processId()));
- p.initCrdFut.listen(f -> {
+ p.initCrdFut.listen(() -> {
boolean isEmpty;
synchronized (mux) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index e3e2cb14c50..2b42c4bb2f5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -26,6 +26,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
/**
@@ -125,6 +127,13 @@ public class GridFinishedFuture<T> implements
IgniteInternalFuture<T> {
lsnr.apply(this);
}
+ /** {@inheritDoc} */
+ @Override public void listen(IgniteRunnable lsnr) {
+ assert lsnr != null;
+
+ lsnr.run();
+ }
+
/** {@inheritDoc} */
@Override public <R> IgniteInternalFuture<R> chain(final IgniteClosure<?
super IgniteInternalFuture<T>, R> doneCb) {
try {
@@ -138,29 +147,39 @@ public class GridFinishedFuture<T> implements
IgniteInternalFuture<T> {
}
}
+ /** {@inheritDoc} */
+ @Override public <R> IgniteInternalFuture<R> chain(final
IgniteOutClosure<R> doneCb) {
+ return chain(ignored -> doneCb.apply());
+ }
+
/** {@inheritDoc} */
@Override public <T1> IgniteInternalFuture<T1> chain(final IgniteClosure<?
super IgniteInternalFuture<T>, T1> doneCb, Executor exec) {
+ assert doneCb != null;
+
final GridFutureAdapter<T1> fut = new GridFutureAdapter<>();
- exec.execute(new Runnable() {
- @Override public void run() {
- try {
- fut.onDone(doneCb.apply(GridFinishedFuture.this));
- }
- catch (GridClosureException e) {
- fut.onDone(e.unwrap());
- }
- catch (RuntimeException | Error e) {
- fut.onDone(e);
+ exec.execute(() -> {
+ try {
+ fut.onDone(doneCb.apply(this));
+ }
+ catch (GridClosureException e) {
+ fut.onDone(e.unwrap());
+ }
+ catch (RuntimeException | Error e) {
+ fut.onDone(e);
- throw e;
- }
+ throw e;
}
});
return fut;
}
+ /** {@inheritDoc} */
+ @Override public <R> IgniteInternalFuture<R> chain(final
IgniteOutClosure<R> doneCb, Executor exec) {
+ return chain(ignored -> doneCb.apply(), exec);
+ }
+
/** {@inheritDoc} */
@Override public <R> IgniteInternalFuture<R> chainCompose(
IgniteClosure<? super IgniteInternalFuture<T>,
IgniteInternalFuture<R>> doneCb
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 89c806c8024..b49a494b792 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Async;
import org.jetbrains.annotations.Nullable;
@@ -356,15 +358,19 @@ public class GridFutureAdapter<R> implements
IgniteInternalFuture<R> {
}
/** {@inheritDoc} */
- @Override public <T> IgniteInternalFuture<T> chain(
- IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb
- ) {
- ChainFuture<R, T> fut = new ChainFuture<>(this, doneCb, null);
+ @Async.Schedule
+ @Override public void listen(IgniteRunnable lsnr) {
+ listen(ignored -> lsnr.run());
+ }
- if (ignoreInterrupts)
- fut.ignoreInterrupts();
+ /** {@inheritDoc} */
+ @Override public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super
IgniteInternalFuture<R>, T> doneCb) {
+ return chain(doneCb, null);
+ }
- return fut;
+ /** {@inheritDoc} */
+ @Override public <T> IgniteInternalFuture<T> chain(IgniteOutClosure<T>
doneCb) {
+ return chain(ignored -> doneCb.apply());
}
/** {@inheritDoc} */
@@ -380,6 +386,11 @@ public class GridFutureAdapter<R> implements
IgniteInternalFuture<R> {
return fut;
}
+ /** {@inheritDoc} */
+ @Override public <T> IgniteInternalFuture<T> chain(IgniteOutClosure<T>
doneCb, Executor exec) {
+ return chain(ignored -> doneCb.apply(), exec);
+ }
+
/** {@inheritDoc} */
@Override public <T> IgniteInternalFuture<T> chainCompose(
IgniteClosure<? super IgniteInternalFuture<R>,
IgniteInternalFuture<T>> doneCb
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 7b8e6c0be70..8ff6efc32a1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -498,9 +498,9 @@ public class GridSelectorNioSessionImpl extends
GridNioSessionImpl implements Gr
GridNioFuture<Boolean> fut = super.close();
if (!fut.isDone()) {
- fut.listen(fut0 -> {
+ fut.listen(() -> {
try {
- fut0.get();
+ fut.get();
}
catch (IgniteCheckedException e) {
log.error("Failed to close session [ses=" +
GridSelectorNioSessionImpl.this + ']', e);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index 0fd9691bfb0..e9841d43fd9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -252,7 +252,7 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
long startTime = System.nanoTime();
- fut.listen(f ->
handshakeDuration.value(U.nanosToMillis(System.nanoTime() - startTime)));
+ fut.listen(() ->
handshakeDuration.value(U.nanosToMillis(System.nanoTime() - startTime)));
}
hnd.handshake();
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
index ada5c23fe60..42c21bbc5b5 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionClientPool.java
@@ -364,9 +364,9 @@ public class ConnectionClientPool {
ConnectionRequestFuture triggerFut = new ConnectionRequestFuture();
- triggerFut.listen(f -> {
+ triggerFut.listen(() -> {
try {
- fut0.onDone(f.get());
+ fut0.onDone(triggerFut.get());
}
catch (Throwable t) {
fut0.onDone(t);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
index 1557951332e..34235f4ae1e 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsDataRegionMetricsTest.java
@@ -296,7 +296,7 @@ public class IgnitePdsDataRegionMetricsTest extends
GridCommonAbstractTest {
IgniteInternalFuture chpBeginFut = psMgr.wakeupForCheckpoint(null);
- chpBeginFut.listen((f) -> {
+ chpBeginFut.listen(() -> {
load(ig);
metricsResult.onDone(new T2<>(
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
index d452ebb4d5d..fcde905bc8b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotRemoteRequestTest.java
@@ -107,7 +107,7 @@ public class IgniteSnapshotRemoteRequestTest extends
IgniteClusterSnapshotRestor
() -> false,
defaultPartitionConsumer(parts0, latch)));
- locFut.listen(f -> assertEquals("All partitions must be
handled: " + parts0,
+ locFut.listen(() -> assertEquals("All partitions must be
handled: " + parts0,
F.size(parts0.values(), Set::isEmpty), parts0.size()));
}
catch (IgniteCheckedException e) {
@@ -361,7 +361,7 @@ public class IgniteSnapshotRemoteRequestTest extends
IgniteClusterSnapshotRestor
.requestRemoteSnapshotFiles(sndNode, null, SNAPSHOT_NAME,
null, expParts, () -> false,
defaultPartitionConsumer(expParts, null));
- fut.listen(f -> expParts.values().forEach(integers ->
assertTrue(integers.isEmpty())));
+ fut.listen(() -> expParts.values().forEach(integers ->
assertTrue(integers.isEmpty())));
futs.add(fut);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
index 3c8f41abb84..cb1aa54eb98 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateAbstractTest.java
@@ -518,7 +518,7 @@ public abstract class TxPartitionCounterStateAbstractTest
extends GridCommonAbst
private GridFutureAdapter<?>
createSendFuture(TestRecordingCommunicationSpi wrapperSpi, Message msg) {
GridFutureAdapter<?> fut = new GridFutureAdapter<>();
- fut.listen(fut1 -> wrapperSpi.stopBlock(true, blockedMsg ->
+ fut.listen(() -> wrapperSpi.stopBlock(true, blockedMsg ->
blockedMsg.ioMessage().message() == msg, false, true));
return fut;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
index 62b62ba8cd3..97416cf6d55 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRecoveryWithConcurrentRollbackTest.java
@@ -394,7 +394,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends
GridCommonAbstractTest
// Doing only prepare to try to lock the key, commit is not
needed here.
p.tx().prepareNearTxLocal();
- p.tx().currentPrepareFuture().listen(fut ->
txPrepareLatch.countDown());
+
p.tx().currentPrepareFuture().listen(txPrepareLatch::countDown);
}
catch (Exception e) {
// No-op.
@@ -410,7 +410,7 @@ public class TxRecoveryWithConcurrentRollbackTest extends
GridCommonAbstractTest
p.tx().prepareNearTxLocal();
- p.tx().currentPrepareFuture().listen(fut ->
txPrepareLatch.countDown());
+ p.tx().currentPrepareFuture().listen(txPrepareLatch::countDown);
txPrepareLatch.await(6, TimeUnit.SECONDS);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index 575bfe0acac..b168fc912e2 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -1198,7 +1198,7 @@ public class GridEventConsumeSelfTest extends
GridCommonAbstractTest {
return null;
}, 6, "consume-starter");
- starterFut.listen((fut) -> stop.set(true));
+ starterFut.listen(() -> stop.set(true));
IgniteInternalFuture<?> stopperFut = multithreadedAsync(() -> {
while (!stop.get() || !queue.isEmpty()) {
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/RestProcessorInitializationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/RestProcessorInitializationTest.java
index 295cb40e8d1..6434f93e8ea 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/RestProcessorInitializationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/RestProcessorInitializationTest.java
@@ -114,7 +114,7 @@ public class RestProcessorInitializationTest extends
GridCommonAbstractTest {
@Override protected IgniteInternalFuture<GridRestResponse>
handleAsync0(GridRestRequest req) {
IgniteInternalFuture<GridRestResponse> fut =
super.handleAsync0(req);
- fut.listen(f -> tuple.set(req, f));
+ fut.listen(() -> tuple.set(req, fut));
return fut;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/DistributedProcessCoordinatorLeftTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/DistributedProcessCoordinatorLeftTest.java
index 4342859a282..424be5a3918 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/DistributedProcessCoordinatorLeftTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/DistributedProcessCoordinatorLeftTest.java
@@ -130,7 +130,7 @@ public class DistributedProcessCoordinatorLeftTest extends
GridCommonAbstractTes
// A single message will be sent before this latch
released.
// It is guaranteed by the LIFO order of future listeners
notifying.
if
(!grid.name().equals(getTestIgniteInstanceName(STOP_NODE_IDX)))
- fut.listen(f -> msgSendLatch.countDown());
+ fut.listen(msgSendLatch::countDown);
startLatch.countDown();
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index c0a40ceb757..236aa0c8730 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1052,9 +1052,9 @@ public final class GridTestUtils {
}
};
- runFut.listen(fut -> {
+ runFut.listen(() -> {
try {
- resFut.onDone(fut.get());
+ resFut.onDone(runFut.get());
}
catch (IgniteFutureCancelledCheckedException e) {
resFut.onCancelled();
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
index ccbdf765307..3cc5afa954c 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildSelfTest.java
@@ -413,12 +413,12 @@ public class GridIndexRebuildSelfTest extends
DynamicIndexAbstractSelfTest {
firstRbld = false;
if (slowRebuildIdxFut) {
- rebuildIdxFut.listen(fut -> {
+ rebuildIdxFut.listen(() -> {
try {
U.sleep(1_000);
}
catch (IgniteInterruptedCheckedException e) {
- log.error("Error while slow down " + fut, e);
+ log.error("Error while slow down " + rebuildIdxFut, e);
}
});
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
index 771dcddcce3..0ef1b08aec3 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryDataPageScanTest.java
@@ -288,8 +288,8 @@ public class QueryDataPageScanTest extends
GridCommonAbstractTest {
}
}, 2, "query");
- qryFut.listen((f) -> cancel.set(true));
- updFut.listen((f) -> cancel.set(true));
+ qryFut.listen(() -> cancel.set(true));
+ updFut.listen(() -> cancel.set(true));
long start = U.currentTimeMillis();