http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index 1b11688..87d9225 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -611,6 +611,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat } /** {@inheritDoc} */ + @Override public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 1c1addd..c3e9fbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -125,6 +125,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin ); } + /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + /** * @param key Key to add. * @param val Optional update value. http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index fa7f367..4272a4d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -244,6 +244,11 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep return accessTtl; } + /** {@inheritDoc} */ + @Override public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + /** * @param ctx Cache context. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index a419887..bc16ff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1585,6 +1585,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { */ @SuppressWarnings("unchecked") protected IgniteInternalFuture asyncOp(final Callable<?> op) { + if (!asyncToggled) + return ctx.closures().callLocalSafe(op); + IgniteInternalFuture fail = asyncOpAcquire(); if (fail != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index d34047e..eb5e214 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -272,7 +272,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage false, null, req.keyValueFilter(), - req.partition(), + req.partition() == -1 ? null : req.partition(), req.className(), req.clause(), req.includeMetaData(), http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 60c4662..9f965d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -121,7 +121,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache private int taskHash; /** Partition. */ - private int part; + private int part = -1; /** */ private AffinityTopologyVersion topVer; @@ -478,8 +478,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** * @return partition. */ - @Nullable public Integer partition() { - return part == -1 ? null : part; + public int partition() { + return part; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 6d21dcf..393fb1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -391,7 +391,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** {@inheritDoc} */ @Override public IgniteInternalFuture<Void> loadMissing( final GridCacheContext cacheCtx, - AffinityTopologyVersion topVer, + final AffinityTopologyVersion topVer, final boolean readThrough, boolean async, final Collection<KeyCacheObject> keys, @@ -472,7 +472,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig CacheObject cacheVal = cacheCtx.toCacheObject(val); while (true) { - GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer); try { GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null); @@ -1507,7 +1507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig assert txEntry != null || readCommitted() || skipVals; - GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached(); + GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached(); if (readCommitted() || skipVals) { cacheCtx.evicts().touch(e, topologyVersion()); @@ -1658,7 +1658,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig IgniteTxLocalAdapter.this, /*swap*/cacheCtx.isSwapOrOffheapEnabled(), /*unmarshal*/true, - /**update-metrics*/true, + /*update-metrics*/true, /*event*/!skipVals, CU.subjectId(IgniteTxLocalAdapter.this, cctx), transformClo, http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 32fda87..fee4dd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -328,6 +328,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { if (!allowOverwrite) cctx.topology().readLock(); + GridDhtTopologyFuture topWaitFut = null; + try { GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); @@ -352,19 +354,25 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { waitFut = allowOverwrite ? null : cctx.mvcc().addDataStreamerFuture(topVer); } - else { - fut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { - localUpdate(nodeId, req, updater, topic); - } - }); - } + else + topWaitFut = fut; } finally { if (!allowOverwrite) cctx.topology().readUnlock(); } + if (topWaitFut != null) { + // Need call 'listen' after topology read lock is released. + topWaitFut.listen(new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> e) { + localUpdate(nodeId, req, updater, topic); + } + }); + + return; + } + if (job != null) { try { job.call(); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java index 3405b53..4c037b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java @@ -226,7 +226,7 @@ public class IgfsContext { */ public void runInIgfsThreadPool(Runnable r) { try { - igfsSvc.submit(r); + igfsSvc.execute(r); } catch (RejectedExecutionException ignored) { // This exception will happen if network speed is too low and data comes faster @@ -252,4 +252,4 @@ public class IgfsContext { return mgr; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java index e534800..4490a68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.igfs; +import java.util.concurrent.Executor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -36,6 +37,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -325,6 +327,8 @@ public class IgfsDataManager extends IgfsManager { IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key); if (secReader != null) { + Executor exec = igfsCtx.kernalContext().pools().poolForPolicy(GridIoPolicy.IGFS_POOL); + fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() { @Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException { byte[] res = fut.get(); @@ -365,7 +369,7 @@ public class IgfsDataManager extends IgfsManager { return res; } - }); + }, exec); } else igfsCtx.metrics().addReadBlocks(1, 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index ab4ee85..6b23e80 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -326,7 +326,7 @@ public final class IgfsImpl implements IgfsEx { // Submit it to the thread pool immediately. assert dualPool != null; - dualPool.submit(batch); + dualPool.execute(batch); // Spin in case another batch is currently running. while (true) { http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java index 9388a8e..7cba9bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java @@ -139,6 +139,7 @@ public class OdbcProcessor extends GridProcessorAdapter { .logger(log) .selectorCount(DFLT_SELECTOR_CNT) .gridName(ctx.gridName()) + .serverName("odbc") .tcpNoDelay(DFLT_TCP_NODELAY) .directBuffer(DFLT_TCP_DIRECT_BUF) .byteOrder(ByteOrder.nativeOrder()) http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 8ff15d5..5383151 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.platform.compute; +import java.util.concurrent.Executor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.binary.BinaryObject; @@ -409,6 +410,11 @@ public class PlatformCompute extends PlatformAbstractTarget { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture chain(IgniteClosure doneCb, Executor exec) { + throw new UnsupportedOperationException("Chain operation is not supported."); + } + + /** {@inheritDoc} */ @Override public Throwable error() { return fut.error(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java index b403654..71eca65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java @@ -21,7 +21,6 @@ import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.rest.GridRestCommand; import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler; @@ -38,8 +37,6 @@ import org.apache.ignite.internal.util.nio.GridNioSession; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.rest.GridRestCommand.ATOMIC_DECREMENT; @@ -72,24 +69,16 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr /** Handler. */ private final GridRestProtocolHandler hnd; - /** JDK marshaller. */ - private final Marshaller jdkMarshaller = new JdkMarshaller(); - - /** Context. */ - private final GridKernalContext ctx; - /** * Creates listener which will convert incoming tcp packets to rest requests and forward them to * a given rest handler. * * @param log Logger to use. * @param hnd Rest handler. - * @param ctx Context. */ - public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd, GridKernalContext ctx) { + public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd) { this.log = log; this.hnd = hnd; - this.ctx = ctx; } /** {@inheritDoc} */ @@ -462,4 +451,4 @@ public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<Gr return new GridTuple3<>(cmd, quiet, retKey); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java index 1c1c6dc..3ba6d8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java @@ -145,7 +145,7 @@ public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridCli */ public GridTcpRestNioListener(IgniteLogger log, GridTcpRestProtocol proto, GridRestProtocolHandler hnd, GridKernalContext ctx) { - memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd, ctx); + memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd); redisLsnr = new GridRedisNioListener(log, hnd, ctx); this.log = log; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java index 6338fcc..2a002a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java @@ -257,6 +257,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter { .logger(log) .selectorCount(cfg.getSelectorCount()) .gridName(ctx.gridName()) + .serverName("tcp-rest") .tcpNoDelay(cfg.isNoDelay()) .directBuffer(cfg.isDirectBuffer()) .byteOrder(ByteOrder.nativeOrder()) http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 6c26363..aeb3ef4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1131,7 +1131,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { // Start service in its own thread. final ExecutorService exe = svcCtx.executor(); - exe.submit(new Runnable() { + exe.execute(new Runnable() { @Override public void run() { try { svc.execute(svcCtx); @@ -1394,7 +1394,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { return; try { - depExe.submit(new BusyRunnable() { + depExe.execute(new BusyRunnable() { @Override public void run0() { onSystemCacheUpdated(deps); } @@ -1587,7 +1587,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { else topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); - depExe.submit(new BusyRunnable() { + depExe.execute(new BusyRunnable() { @Override public void run0() { ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index e1937bb..3dfb3c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -153,7 +153,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; -import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.binary.BinaryRawWriter; import org.apache.ignite.cluster.ClusterGroupEmptyException; @@ -506,10 +505,27 @@ public abstract class IgniteUtils { } }; - /** - * Initializes enterprise check. + /** */ + private static final boolean assertionsEnabled; + + /* + * */ static { + boolean assertionsEnabled0 = true; + + try { + assert false; + + assertionsEnabled0 = false; + } + catch (AssertionError ignored) { + assertionsEnabled0 = true; + } + finally { + assertionsEnabled = assertionsEnabled0; + } + String osName = System.getProperty("os.name"); String osLow = osName.toLowerCase(); @@ -1284,6 +1300,27 @@ public abstract class IgniteUtils { } /** + * @param threadId Thread ID. + * @param sb Builder. + */ + public static void printStackTrace(long threadId, GridStringBuilder sb) { + ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + + ThreadInfo threadInfo = mxBean.getThreadInfo(threadId, Integer.MAX_VALUE); + + printThreadInfo(threadInfo, sb, Collections.<Long>emptySet()); + } + + /** + * @return {@code true} if there is java level deadlock. + */ + public static boolean deadlockPresent() { + ThreadMXBean mxBean = ManagementFactory.getThreadMXBean(); + + return !F.isEmpty(mxBean.findDeadlockedThreads()); + } + + /** * Prints single thread info to a buffer. * * @param threadInfo Thread info. @@ -6141,6 +6178,13 @@ public abstract class IgniteUtils { } /** + * @return {@code True} if assertions enabled. + */ + public static boolean assertionsEnabled() { + return assertionsEnabled; + } + + /** * Gets OS JDK string. * * @return OS JDK string. @@ -8337,6 +8381,18 @@ public abstract class IgniteUtils { } /** + * Gets absolute value for long. If argument is {@link Long#MIN_VALUE}, then {@code 0} is returned. + * + * @param i Argument. + * @return Absolute value. + */ + public static long safeAbs(long i) { + i = Math.abs(i); + + return i < 0 ? 0 : i; + } + + /** * Gets wrapper class for a primitive type. * * @param cls Class. If {@code null}, method is no-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java new file mode 100644 index 0000000..e9ec74b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.LockSupport; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.NotNull; + +/** + * Striped executor. + */ +public class StripedExecutor implements ExecutorService { + /** Stripes. */ + private final Stripe[] stripes; + + /** For starvation checks. */ + private final long[] completedCntrs; + + /** */ + private final IgniteLogger log; + + /** + * Constructor. + * + * @param cnt Count. + */ + public StripedExecutor(int cnt, String gridName, String poolName, final IgniteLogger log) { + A.ensure(cnt > 0, "cnt > 0"); + + boolean success = false; + + stripes = new Stripe[cnt]; + + completedCntrs = new long[cnt]; + + Arrays.fill(completedCntrs, -1); + + this.log = log; + + try { + for (int i = 0; i < cnt; i++) { + stripes[i] = new StripeConcurrentQueue( + gridName, + poolName, + i, + log); + + stripes[i].start(); + } + + success = true; + } + catch (Error | RuntimeException e) { + U.error(log, "Failed to initialize striped pool.", e); + + throw e; + } + finally { + if (!success) { + for (Stripe stripe : stripes) { + if (stripe != null) + stripe.signalStop(); + } + + for (Stripe stripe : stripes) { + if (stripe != null) + stripe.awaitStop(); + } + } + } + } + + /** + * Checks starvation in striped pool. Maybe too verbose + * but this is needed to faster debug possible issues. + */ + public void checkStarvation() { + for (int i = 0; i < stripes.length; i++) { + Stripe stripe = stripes[i]; + + long completedCnt = stripe.completedCnt; + + boolean active = stripe.active; + + if (completedCntrs[i] != -1 && + completedCntrs[i] == completedCnt && + active) { + boolean deadlockPresent = U.deadlockPresent(); + + GridStringBuilder sb = new GridStringBuilder(); + + sb.a(">>> Possible starvation in striped pool: ") + .a(stripe.thread.getName()).a(U.nl()) + .a(stripe.queueToString()).a(U.nl()) + .a("deadlock: ").a(deadlockPresent).a(U.nl()) + .a("completed: ").a(completedCnt).a(U.nl()); + + U.printStackTrace( + stripe.thread.getId(), + sb); + + String msg = sb.toString(); + + U.warn(log, msg); + } + + if (active || completedCnt > 0) + completedCntrs[i] = completedCnt; + } + } + + /** + * @return Stripes count. + */ + public int stripes() { + return stripes.length; + } + + /** + * Execute command. + * + * @param idx Index. + * @param cmd Command. + */ + public void execute(int idx, Runnable cmd) { + if (idx == -1) + execute(cmd); + else { + assert idx >= 0 : idx; + + stripes[idx % stripes.length].execute(cmd); + } + } + + /** {@inheritDoc} */ + @Override public void shutdown() { + signalStop(); + } + + /** {@inheritDoc} */ + @Override public void execute(@NotNull Runnable cmd) { + stripes[ThreadLocalRandom.current().nextInt(stripes.length)].execute(cmd); + } + + /** + * {@inheritDoc} + * + * @return Empty list (always). + */ + @NotNull @Override public List<Runnable> shutdownNow() { + signalStop(); + + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public boolean awaitTermination( + long timeout, + @NotNull TimeUnit unit + ) throws InterruptedException { + awaitStop(); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean isShutdown() { + for (Stripe stripe : stripes) { + if (stripe != null && stripe.stopping) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isTerminated() { + for (Stripe stripe : stripes) { + if (stripe.thread.getState() != Thread.State.TERMINATED) + return false; + } + + return true; + } + + /** + * Stops executor. + */ + public void stop() { + signalStop(); + + awaitStop(); + } + + /** + * Signals all stripes. + */ + private void signalStop() { + for (Stripe stripe : stripes) + stripe.signalStop(); + } + + /** + * @throws IgniteInterruptedException If interrupted. + */ + private void awaitStop() throws IgniteInterruptedException { + for (Stripe stripe : stripes) + stripe.awaitStop(); + } + + /** + * @return Return total queue size of all stripes. + */ + public int queueSize() { + int size = 0; + + for (Stripe stripe : stripes) + size += stripe.queueSize(); + + return size; + } + + /** + * @return Completed tasks count. + */ + public long completedTasks() { + long cnt = 0; + + for (Stripe stripe : stripes) + cnt += stripe.completedCnt; + + return cnt; + } + + /** + * Operation not supported. + */ + @NotNull @Override public <T> Future<T> submit( + @NotNull Runnable task, + T res + ) { + throw new UnsupportedOperationException(); + } + + /** + * Operation not supported. + */ + @NotNull @Override public Future<?> submit(@NotNull Runnable task) { + throw new UnsupportedOperationException(); + } + + /** + * Operation not supported. + */ + @NotNull @Override public <T> Future<T> submit(@NotNull Callable<T> task) { + throw new UnsupportedOperationException(); + } + + /** + * Operation not supported. + */ + @NotNull @Override public <T> List<Future<T>> invokeAll(@NotNull Collection<? extends Callable<T>> tasks) + throws InterruptedException { + throw new UnsupportedOperationException(); + } + + /** + * Operation not supported. + */ + @NotNull @Override public <T> List<Future<T>> invokeAll( + @NotNull Collection<? extends Callable<T>> tasks, + long timeout, + @NotNull TimeUnit unit + ) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + /** + * Operation not supported. + */ + @NotNull @Override public <T> T invokeAny(@NotNull Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + /** + * Operation not supported. + */ + @Override public <T> T invokeAny( + @NotNull Collection<? extends Callable<T>> tasks, + long timeout, + @NotNull TimeUnit unit + ) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StripedExecutor.class, this); + } + + /** + * Stripe. + */ + private static abstract class Stripe implements Runnable { + /** */ + private final String gridName; + + /** */ + private final String poolName; + + /** */ + private final int idx; + + /** */ + private final IgniteLogger log; + + /** Stopping flag. */ + private volatile boolean stopping; + + /** */ + private volatile long completedCnt; + + /** */ + private volatile boolean active; + + /** Thread executing the loop. */ + protected Thread thread; + + /** + * @param gridName Grid name. + * @param poolName Pool name. + * @param idx Stripe index. + * @param log Logger. + */ + public Stripe( + String gridName, + String poolName, + int idx, + IgniteLogger log + ) { + this.gridName = gridName; + this.poolName = poolName; + this.idx = idx; + this.log = log; + } + + /** + * Starts the stripe. + */ + void start() { + thread = new IgniteThread(gridName, poolName + "-stripe-" + idx, this); + + thread.start(); + } + + /** + * Stop the stripe. + */ + void signalStop() { + stopping = true; + + U.interrupt(thread); + } + + /** + * Await thread stop. + */ + void awaitStop() { + try { + if (thread != null) + thread.join(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void run() { + while (!stopping) { + Runnable cmd; + + try { + cmd = take(); + + if (cmd != null) { + active = true; + + try { + cmd.run(); + } + finally { + active = false; + completedCnt++; + } + } + } + catch (InterruptedException e) { + stopping = true; + + Thread.currentThread().interrupt(); + + return; + } + catch (Throwable e) { + U.error(log, "Failed to execute runnable.", e); + } + } + } + + /** + * Execute the command. + * + * @param cmd Command. + */ + abstract void execute(Runnable cmd); + + /** + * @return Next runnable. + * @throws InterruptedException If interrupted. + */ + abstract Runnable take() throws InterruptedException; + + /** + * @return Queue size. + */ + abstract int queueSize(); + + /** + * @return Stripe's queue to string presentation. + */ + abstract String queueToString(); + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Stripe.class, this); + } + } + + /** + * Stripe. + */ + private static class StripeConcurrentQueue extends Stripe { + /** Queue. */ + private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); + + /** */ + private volatile boolean parked; + + /** + * @param gridName Grid name. + * @param poolName Pool name. + * @param idx Stripe index. + * @param log Logger. + */ + public StripeConcurrentQueue( + String gridName, + String poolName, + int idx, + IgniteLogger log + ) { + super(gridName, + poolName, + idx, + log); + } + + /** {@inheritDoc} */ + @Override Runnable take() throws InterruptedException { + Runnable r; + + for (int i = 0; i < 2048; i++) { + r = queue.poll(); + + if (r != null) + return r; + } + + parked = true; + + try { + for (;;) { + r = queue.poll(); + + if (r != null) + return r; + + LockSupport.park(); + + if (Thread.interrupted()) + throw new InterruptedException(); + } + } + finally { + parked = false; + } + } + + /** {@inheritDoc} */ + void execute(Runnable cmd) { + queue.add(cmd); + + if (parked) + LockSupport.unpark(thread); + } + + /** {@inheritDoc} */ + @Override String queueToString() { + return String.valueOf(queue); + } + + /** {@inheritDoc} */ + @Override int queueSize() { + return queue.size(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StripeConcurrentQueue.class, this, super.toString()); + } + } + + /** + * Stripe. + */ + private static class StripeConcurrentQueueNoPark extends Stripe { + /** Queue. */ + private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); + + /** + * @param gridName Grid name. + * @param poolName Pool name. + * @param idx Stripe index. + * @param log Logger. + */ + public StripeConcurrentQueueNoPark( + String gridName, + String poolName, + int idx, + IgniteLogger log + ) { + super(gridName, + poolName, + idx, + log); + } + + /** {@inheritDoc} */ + @Override Runnable take() { + for (;;) { + Runnable r = queue.poll(); + + if (r != null) + return r; + } + } + + /** {@inheritDoc} */ + void execute(Runnable cmd) { + queue.add(cmd); + } + + /** {@inheritDoc} */ + @Override int queueSize() { + return queue.size(); + } + + /** {@inheritDoc} */ + @Override String queueToString() { + return String.valueOf(queue); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString()); + } + } + + /** + * Stripe. + */ + private static class StripeConcurrentBlockingQueue extends Stripe { + /** Queue. */ + private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); + + /** + * @param gridName Grid name. + * @param poolName Pool name. + * @param idx Stripe index. + * @param log Logger. + */ + public StripeConcurrentBlockingQueue( + String gridName, + String poolName, + int idx, + IgniteLogger log + ) { + super(gridName, + poolName, + idx, + log); + } + + /** {@inheritDoc} */ + @Override Runnable take() throws InterruptedException { + return queue.take(); + } + + /** {@inheritDoc} */ + void execute(Runnable cmd) { + queue.add(cmd); + } + + /** {@inheritDoc} */ + @Override int queueSize() { + return queue.size(); + } + + /** {@inheritDoc} */ + @Override String queueToString() { + return String.valueOf(queue); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java index 6baedbd..dc63adc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.util.future; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -152,6 +153,29 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> { } /** {@inheritDoc} */ + @Override public <T1> IgniteInternalFuture<T1> chain(final IgniteClosure<? super IgniteInternalFuture<T>, T1> doneCb, Executor exec) { + final GridFutureAdapter<T1> fut = new GridFutureAdapter<>(); + + exec.execute(new Runnable() { + @Override public void run() { + try { + fut.onDone(doneCb.apply(GridFinishedFuture.this)); + } + catch (GridClosureException e) { + fut.onDone(e.unwrap()); + } + catch (RuntimeException | Error e) { + fut.onDone(e); + + throw e; + } + } + }); + + return fut; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridFinishedFuture.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index 2cd534e..c8d85cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.future; import java.util.Arrays; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import org.apache.ignite.IgniteCheckedException; @@ -229,7 +230,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** {@inheritDoc} */ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) { - return new ChainFuture<>(this, doneCb); + return new ChainFuture<>(this, doneCb, null); + } + + /** {@inheritDoc} */ + @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, + Executor exec) { + return new ChainFuture<>(this, doneCb, exec); } /** @@ -487,15 +494,17 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** * @param fut Future. * @param doneCb Closure. + * @param cbExec Optional executor to run callback. */ ChainFuture( GridFutureAdapter<R> fut, - IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb + IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, + @Nullable Executor cbExec ) { this.fut = fut; this.doneCb = doneCb; - fut.listen(new GridFutureChainListener<>(this, doneCb)); + fut.listen(new GridFutureChainListener<>(this, doneCb, cbExec)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java index 947b2ad..367f5d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java @@ -17,15 +17,17 @@ package org.apache.ignite.internal.util.future; +import java.util.concurrent.Executor; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteInClosure; +import org.jetbrains.annotations.Nullable; /** * Future listener to fill chained future with converted result of the source future. */ -public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> { +class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> { /** */ private static final long serialVersionUID = 0L; @@ -35,21 +37,43 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte /** Done callback. */ private final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb; + /** */ + private Executor cbExec; + /** * Constructs chain listener. + * * @param fut Target future. * @param doneCb Done callback. + * @param cbExec Optional executor to run callback. */ public GridFutureChainListener( GridFutureAdapter<R> fut, - IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb + IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb, + @Nullable Executor cbExec ) { this.fut = fut; this.doneCb = doneCb; + this.cbExec = cbExec; } /** {@inheritDoc} */ - @Override public void apply(IgniteInternalFuture<T> t) { + @Override public void apply(final IgniteInternalFuture<T> t) { + if (cbExec != null) { + cbExec.execute(new Runnable() { + @Override public void run() { + applyCallback(t); + } + }); + } + else + applyCallback(t); + } + + /** + * @param t Target future. + */ + private void applyCallback(IgniteInternalFuture<T> t) { try { fut.onDone(doneCb.apply(t)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java index 6820dc7..d108b56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java @@ -201,7 +201,7 @@ public class IpcToNioAdapter<T> { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) { assert ses == IpcToNioAdapter.this.ses; return send((Message)msg); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java index 9b014ec..f2ab932 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java @@ -35,14 +35,24 @@ public abstract class GridAbstractCommunicationClient implements GridCommunicati /** Metrics listener. */ protected final GridNioMetricsListener metricsLsnr; + /** */ + private final int connIdx; + /** + * @param connIdx Connection index. * @param metricsLsnr Metrics listener. */ - protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener metricsLsnr) { + protected GridAbstractCommunicationClient(int connIdx, @Nullable GridNioMetricsListener metricsLsnr) { + this.connIdx = connIdx; this.metricsLsnr = metricsLsnr; } /** {@inheritDoc} */ + @Override public int connectionIndex() { + return connIdx; + } + + /** {@inheritDoc} */ @Override public boolean close() { return reserves.compareAndSet(0, -1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 0de54e9..71b2c24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -96,15 +96,20 @@ public interface GridCommunicationClient { /** * @param nodeId Remote node ID. Provided only for sync clients. * @param msg Message to send. - * @param closure Ack closure. + * @param c Ack closure. * @throws IgniteCheckedException If failed. * @return {@code True} if should try to resend message. */ - public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure) + public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> c) throws IgniteCheckedException; /** * @return {@code True} if send is asynchronous. */ public boolean async(); + + /** + * @return Connection index. + */ + public int connectionIndex(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java index 213fd8d..7987d3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridConnectionBytesVerifyFilter.java @@ -62,13 +62,20 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException { + @Override public void onExceptionCaught( + GridNioSession ses, + IgniteCheckedException ex + ) throws IgniteCheckedException { proceedExceptionCaught(ses, ex); } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg); + @Override public GridNioFuture<?> onSessionWrite( + GridNioSession ses, + Object msg, + boolean fut + ) throws IgniteCheckedException { + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ @@ -137,4 +144,4 @@ public class GridConnectionBytesVerifyFilter extends GridNioFilterAdapter { @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException { proceedSessionWriteTimeout(ses); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java index 9925d2e..40c87cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java @@ -107,8 +107,12 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg); + @Override public GridNioFuture<?> onSessionWrite( + GridNioSession ses, + Object msg, + boolean fut + ) throws IgniteCheckedException { + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ @@ -139,4 +143,4 @@ public class GridNioAsyncNotifyFilter extends GridNioFilterAdapter { "originalEx=" + ex + ", ex=" + e + ']'); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java index 7083ccf..343e625 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioCodecFilter.java @@ -71,20 +71,27 @@ public class GridNioCodecFilter extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException { + @Override public void onExceptionCaught( + GridNioSession ses, + IgniteCheckedException ex + ) throws IgniteCheckedException { proceedExceptionCaught(ses, ex); } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite( + GridNioSession ses, + Object msg, + boolean fut + ) throws IgniteCheckedException { // No encoding needed in direct mode. if (directMode) - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); try { ByteBuffer res = parser.encode(ses, msg); - return proceedSessionWrite(ses, res); + return proceedSessionWrite(ses, res, fut); } catch (IOException e) { throw new GridNioException(e); @@ -137,4 +144,4 @@ public class GridNioCodecFilter extends GridNioFilterAdapter { @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException { proceedSessionWriteTimeout(ses); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java index 5f88b1f..f7928c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilter.java @@ -105,10 +105,15 @@ public interface GridNioFilter { * * @param ses Session instance. * @param msg Message to send. - * @return Write future. + * @param fut {@code True} if write future should be created. + * @return Write future or {@code null}. * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter. */ - public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException; + public GridNioFuture<?> proceedSessionWrite( + GridNioSession ses, + Object msg, + boolean fut + ) throws IgniteCheckedException; /** * Forwards session close request to the next logical filter in filter chain. @@ -149,10 +154,11 @@ public interface GridNioFilter { * * @param ses Session on which message should be written. * @param msg Message being written. - * @return Write future. + * @param fut {@code True} if write future should be created. + * @return Write future or {@code null}. * @throws GridNioException If GridNioException occurred while handling event. */ - public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException; + public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException; /** * Invoked when a new messages received. @@ -241,4 +247,4 @@ public interface GridNioFilter { * @throws IgniteCheckedException If filter is not in chain or GridNioException occurred in the underlying filter. */ public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java index 18ab1b2..58ddae5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterAdapter.java @@ -108,10 +108,14 @@ public abstract class GridNioFilterAdapter implements GridNioFilter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> proceedSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> proceedSessionWrite( + GridNioSession ses, + Object msg, + boolean fut + ) throws IgniteCheckedException { checkNext(); - return nextFilter.onSessionWrite(ses, msg); + return nextFilter.onSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ @@ -180,4 +184,4 @@ public abstract class GridNioFilterAdapter implements GridNioFilter { throw new GridNioException("Failed to proceed with filter call since previous filter is not set " + "(do you use filter outside the filter chain?): " + getClass().getName()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java index a3a74e3..8cc690b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFilterChain.java @@ -181,8 +181,12 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter { * @return Send future. * @throws IgniteCheckedException If IgniteCheckedException occurred while handling event. */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { - return tail.onSessionWrite(ses, msg); + @Override public GridNioFuture<?> onSessionWrite( + GridNioSession ses, + Object msg, + boolean fut + ) throws IgniteCheckedException { + return tail.onSessionWrite(ses, msg, fut); } /** @@ -255,9 +259,9 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ @@ -290,4 +294,4 @@ public class GridNioFilterChain<T> extends GridNioFilterAdapter { return proceedResumeReads(ses); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index b02acc8..6c0c9c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -45,9 +45,9 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> { /** * Sets ack closure which will be applied when ack received. * - * @param closure Ack closure. + * @param c Ack closure. */ - public void ackClosure(IgniteInClosure<IgniteException> closure); + public void ackClosure(IgniteInClosure<IgniteException> c); /** * The method will be called when ack received. http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 35480ac..6258c13 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -35,8 +35,8 @@ public class GridNioRecoveryDescriptor { /** Number of acknowledged messages. */ private long acked; - /** Unacknowledged message futures. */ - private final ArrayDeque<GridNioFuture<?>> msgFuts; + /** Unacknowledged messages. */ + private final ArrayDeque<SessionWriteRequest> msgReqs; /** Number of messages to resend. */ private int resendCnt; @@ -77,23 +77,40 @@ public class GridNioRecoveryDescriptor { /** Number of descriptor reservations (for info purposes). */ private int reserveCnt; + /** */ + private final boolean pairedConnections; + /** + * @param pairedConnections {@code True} if in/out connections pair is used for communication with node. * @param queueLimit Maximum size of unacknowledged messages queue. * @param node Node. * @param log Logger. */ - public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, IgniteLogger log) { + public GridNioRecoveryDescriptor( + boolean pairedConnections, + int queueLimit, + ClusterNode node, + IgniteLogger log + ) { assert !node.isLocal() : node; assert queueLimit > 0; - msgFuts = new ArrayDeque<>(queueLimit); + msgReqs = new ArrayDeque<>(queueLimit); + this.pairedConnections = pairedConnections; this.queueLimit = queueLimit; this.node = node; this.log = log; } /** + * @return {@code True} if in/out connections pair is used for communication with node. + */ + public boolean pairedConnections() { + return pairedConnections; + } + + /** * @return Connect count. */ public long incrementConnectCount() { @@ -154,19 +171,19 @@ public class GridNioRecoveryDescriptor { } /** - * @param fut NIO future. + * @param req Write request. * @return {@code False} if queue limit is exceeded. */ - public boolean add(GridNioFuture<?> fut) { - assert fut != null; + public boolean add(SessionWriteRequest req) { + assert req != null; - if (!fut.skipRecovery()) { + if (!req.skipRecovery()) { if (resendCnt == 0) { - msgFuts.addLast(fut); + msgReqs.addLast(req); sentCnt++; - return msgFuts.size() < queueLimit; + return msgReqs.size() < queueLimit; } else resendCnt--; @@ -181,21 +198,19 @@ public class GridNioRecoveryDescriptor { public void ackReceived(long rcvCnt) { if (log.isDebugEnabled()) log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt + - ", msgFuts=" + msgFuts.size() + ']'); + ", msgReqs=" + msgReqs.size() + ']'); while (acked < rcvCnt) { - GridNioFuture<?> fut = msgFuts.pollFirst(); + SessionWriteRequest req = msgReqs.pollFirst(); - assert fut != null : "Missed message future [rcvCnt=" + rcvCnt + + assert req != null : "Missed message [rcvCnt=" + rcvCnt + ", acked=" + acked + ", desc=" + this + ']'; - assert fut.isDone() : fut; - - if (fut.ackClosure() != null) - fut.ackClosure().apply(null); + if (req.ackClosure() != null) + req.ackClosure().apply(null); - fut.onAckReceived(); + req.onAckReceived(); acked++; } @@ -214,7 +229,7 @@ public class GridNioRecoveryDescriptor { * @return {@code False} if descriptor is reserved. */ public boolean onNodeLeft() { - GridNioFuture<?>[] futs = null; + SessionWriteRequest[] reqs = null; synchronized (this) { nodeLeft = true; @@ -222,24 +237,24 @@ public class GridNioRecoveryDescriptor { if (reserved) return false; - if (!msgFuts.isEmpty()) { - futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); + if (!msgReqs.isEmpty()) { + reqs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]); - msgFuts.clear(); + msgReqs.clear(); } } - if (futs != null) - completeOnNodeLeft(futs); + if (reqs != null) + notifyOnNodeLeft(reqs); return true; } /** - * @return Message futures for unacknowledged messages. + * @return Requests for unacknowledged messages. */ - public Deque<GridNioFuture<?>> messagesFutures() { - return msgFuts; + public Deque<SessionWriteRequest> messagesRequests() { + return msgReqs; } /** @@ -277,14 +292,14 @@ public class GridNioRecoveryDescriptor { if (!nodeLeft) ackReceived(rcvCnt); - resendCnt = msgFuts.size(); + resendCnt = msgReqs.size(); } } /** * */ - public void connected() { + public void onConnected() { synchronized (this) { assert reserved : this; assert !connected : this; @@ -306,10 +321,37 @@ public class GridNioRecoveryDescriptor { } /** + * @return Connected flag. + */ + public boolean connected() { + synchronized (this) { + return connected; + } + } + + /** + * @return Reserved flag. + */ + public boolean reserved() { + synchronized (this) { + return reserved; + } + } + + /** + * @return Current handshake index. + */ + public Long handshakeIndex() { + synchronized (this) { + return handshakeReq != null ? handshakeReq.get1() : null; + } + } + + /** * */ public void release() { - GridNioFuture<?>[] futs = null; + SessionWriteRequest[] futs = null; synchronized (this) { connected = false; @@ -329,15 +371,15 @@ public class GridNioRecoveryDescriptor { notifyAll(); } - if (nodeLeft && !msgFuts.isEmpty()) { - futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); + if (nodeLeft && !msgReqs.isEmpty()) { + futs = msgReqs.toArray(new SessionWriteRequest[msgReqs.size()]); - msgFuts.clear(); + msgReqs.clear(); } } if (futs != null) - completeOnNodeLeft(futs); + notifyOnNodeLeft(futs); } /** @@ -398,16 +440,16 @@ public class GridNioRecoveryDescriptor { } /** - * @param futs Futures to complete. + * @param reqs Requests to notify about error. */ - private void completeOnNodeLeft(GridNioFuture<?>[] futs) { - for (GridNioFuture<?> msg : futs) { - IOException e = new IOException("Failed to send message, node has left: " + node.id()); + private void notifyOnNodeLeft(SessionWriteRequest[] reqs) { + IOException e = new IOException("Failed to send message, node has left: " + node.id()); - ((GridNioFutureImpl)msg).onDone(e); + for (SessionWriteRequest req : reqs) { + req.onError(e); - if (msg.ackClosure() != null) - msg.ackClosure().apply(new IgniteException(e)); + if (req.ackClosure() != null) + req.ackClosure().apply(new IgniteException(e)); } }
