ignite-comm-balance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b561f77 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b561f77 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b561f77 Branch: refs/heads/ignite-4371 Commit: 2b561f7754c41ac7c972224b059e94c553cea427 Parents: d6a9767 Author: sboikov <[email protected]> Authored: Wed Dec 7 12:30:29 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Dec 7 14:28:34 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/IgniteInternalFuture.java | 11 +++++++ .../transactions/IgniteTxLocalAdapter.java | 8 +++--- .../processors/igfs/IgfsDataManager.java | 6 +++- .../platform/compute/PlatformCompute.java | 6 ++++ .../util/future/GridFinishedFuture.java | 24 ++++++++++++++++ .../internal/util/future/GridFutureAdapter.java | 15 ++++++++-- .../util/future/GridFutureChainListener.java | 30 ++++++++++++++++++-- .../TxDeadlockDetectionNoHangsTest.java | 2 +- 8 files changed, 90 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java index b80a755..789556d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; @@ -133,6 +134,16 @@ public interface IgniteInternalFuture<R> { public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb); /** + * Make a chained future to convert result of this future (when complete) into a new format. + * It is guaranteed that done callback will be called only ONCE. + * + * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result. + * @param exec Executor to run callback. + * @return Chained future that finishes after this future completes and done callback is called. + */ + public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, Executor exec); + + /** * @return Error value if future has already been completed with error. */ public Throwable error(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 6d21dcf..393fb1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -391,7 +391,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> loadMissing( final GridCacheContext cacheCtx, - AffinityTopologyVersion topVer, + final AffinityTopologyVersion topVer, final boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, @@ -472,7 +472,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CacheObject cacheVal = cacheCtx.toCacheObject(val); while (true) { - GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer); try { GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null); @@ -1507,7 +1507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig assert txEntry != null || readCommitted() || skipVals; - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached(); if (readCommitted() || skipVals) { cacheCtx.evicts().touch(e, topologyVersion()); @@ -1658,7 +1658,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig IgniteTxLocalAdapter.this, /*swap*/cacheCtx.isSwapOrOffheapEnabled(), /*unmarshal*/true, - /**update-metrics*/true, + /*update-metrics*/true, /*event*/!skipVals, CU.subjectId(IgniteTxLocalAdapter.this, cctx), transformClo, http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index e534800..4490a68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.igfs; +import java.util.concurrent.Executor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -36,6 +37,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -325,6 +327,8 @@ public class IgfsDataManager extends IgfsManager { IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key); if (secReader != null) { + Executor exec = igfsCtx.kernalContext().pools().poolForPolicy(GridIoPolicy.IGFS_POOL); + fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() { @Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException { byte[] res = fut.get(); @@ -365,7 +369,7 @@ public class IgfsDataManager extends IgfsManager { return res; } - }); + }, exec); } else igfsCtx.metrics().addReadBlocks(1, 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 8ff15d5..5383151 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.platform.compute; +import java.util.concurrent.Executor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.binary.BinaryObject; @@ -409,6 +410,11 @@ public class PlatformCompute extends PlatformAbstractTarget { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture chain(IgniteClosure doneCb, Executor exec) { + throw new UnsupportedOperationException("Chain operation is not supported."); + } + + /** {@inheritDoc} */ @Override public Throwable error() { return fut.error(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java index 6baedbd..dc63adc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util.future; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -152,6 +153,29 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { } /** {@inheritDoc} */ + @Override public <T1> IgniteInternalFuture<T1> chain(final IgniteClosure<? super IgniteInternalFuture<T>, T1> doneCb, Executor exec) { + final GridFutureAdapter<T1> fut = new GridFutureAdapter<>(); + + exec.execute(new Runnable() { + @Override public void run() { + try { + fut.onDone(doneCb.apply(GridFinishedFuture.this)); + } + catch (GridClosureException e) { + fut.onDone(e.unwrap()); + } + catch (RuntimeException | Error e) { + fut.onDone(e); + + throw e; + } + } + }); + + return fut; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridFinishedFuture.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 2cd534e..c8d85cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.future; import java.util.Arrays; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import org.apache.ignite.IgniteCheckedException; @@ -229,7 +230,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** {@inheritDoc} */ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) { - return new ChainFuture<>(this, doneCb); + return new ChainFuture<>(this, doneCb, null); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, + Executor exec) { + return new ChainFuture<>(this, doneCb, exec); } /** @@ -487,15 +494,17 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** * @param fut Future. * @param doneCb Closure. + * @param cbExec Optional executor to run callback. */ ChainFuture( GridFutureAdapter<R> fut, - IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb + IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, + @Nullable Executor cbExec ) { this.fut = fut; this.doneCb = doneCb; - fut.listen(new GridFutureChainListener<>(this, doneCb)); + fut.listen(new GridFutureChainListener<>(this, doneCb, cbExec)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java index 947b2ad..15ef555 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java @@ -17,15 +17,17 @@ package org.apache.ignite.internal.util.future; +import java.util.concurrent.Executor; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; /** * Future listener to fill chained future with converted result of the source future. */ -public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> { +class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; @@ -35,21 +37,43 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte /** Done callback. */ private final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb; + /** */ + private Executor cbExec; + /** * Constructs chain listener. + * * @param fut Target future. * @param doneCb Done callback. + * @param cbExec Optional executor to run callback. */ public GridFutureChainListener( GridFutureAdapter<R> fut, - IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb + IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb, + @Nullable Executor cbExec ) { this.fut = fut; this.doneCb = doneCb; + this.cbExec = cbExec; } /** {@inheritDoc} */ - @Override public void apply(IgniteInternalFuture<T> t) { + @Override public void apply(final IgniteInternalFuture<T> t) { + if (cbExec != null) { + cbExec.execute(new Runnable() { + @Override public void run() { + apply(t); + } + }); + } + else + applyCallback(t); + } + + /** + * @param t Target future. + */ + private void applyCallback(IgniteInternalFuture<T> t) { try { fut.onDone(doneCb.apply(t)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java index c9d18eb..e9d74ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java @@ -211,7 +211,7 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest { tx.commit(); } catch (Exception e) { - e.printStackTrace(); + log.info("Ignore error: " + e); } } }, NODES_CNT * 3, "tx-thread");
