IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NIO sessions balancing between NIO threads - reduced amount of work and blocking calls in NIO threads Other: - implemented StripedExecutor for cache messages handling - added 'io test' messages for IO performance testing
(cherry picked from commit 10ade28) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05dd08b9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05dd08b9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05dd08b9 Branch: refs/heads/master Commit: 05dd08b993e2d7f88176c051463b178431714f85 Parents: 57eb47f Author: sboikov <[email protected]> Authored: Fri Dec 9 12:28:47 2016 +0300 Committer: sboikov <[email protected]> Committed: Fri Dec 16 14:01:53 2016 +0300 ---------------------------------------------------------------------- .../ignite/examples/ExampleNodeStartup.java | 2 +- .../examples/datagrid/CachePutGetExample.java | 2 +- .../scalar/examples/ScalarJvmCloudExample.scala | 2 +- .../rest/ClientMemcachedProtocolSelfTest.java | 4 +- .../rest/protocols/tcp/MockNioSession.java | 25 +- .../apache/ignite/IgniteSystemProperties.java | 3 + .../cache/store/CacheLoadOnlyStoreAdapter.java | 6 +- .../configuration/IgniteConfiguration.java | 50 +- .../internal/GridEventConsumeHandler.java | 2 +- .../ignite/internal/GridJobContextImpl.java | 4 +- .../ignite/internal/GridKernalContext.java | 9 + .../ignite/internal/GridKernalContextImpl.java | 16 +- .../internal/GridPerformanceSuggestions.java | 2 +- .../org/apache/ignite/internal/GridTopic.java | 5 +- .../ignite/internal/IgniteInternalFuture.java | 11 + .../apache/ignite/internal/IgniteKernal.java | 85 +- .../org/apache/ignite/internal/IgnitionEx.java | 32 +- .../GridClientConnectionManagerAdapter.java | 1 + .../client/router/impl/GridTcpRouterImpl.java | 1 + .../managers/communication/GridIoManager.java | 207 ++- .../managers/communication/GridIoMessage.java | 13 + .../communication/GridIoMessageFactory.java | 12 +- .../communication/IgniteIoTestMessage.java | 235 +++ .../processors/cache/GridCacheAdapter.java | 26 +- .../processors/cache/GridCacheMessage.java | 7 + .../processors/cache/GridCacheUtils.java | 35 + .../processors/cache/IgniteCacheProxy.java | 8 + .../distributed/GridDistributedLockRequest.java | 5 + .../GridDistributedTxFinishResponse.java | 6 + .../GridDistributedUnlockRequest.java | 5 + .../distributed/dht/GridDhtCacheAdapter.java | 3 +- .../distributed/dht/GridDhtLockResponse.java | 9 +- .../dht/atomic/GridDhtAtomicCache.java | 5 +- .../GridDhtAtomicSingleUpdateRequest.java | 5 + .../dht/atomic/GridDhtAtomicUpdateRequest.java | 5 + .../atomic/GridNearAtomicFullUpdateRequest.java | 5 + .../GridNearAtomicSingleUpdateRequest.java | 5 + .../distributed/near/GridNearGetRequest.java | 5 + .../local/atomic/GridLocalAtomicCache.java | 3 + .../query/GridCacheDistributedQueryManager.java | 2 +- .../cache/query/GridCacheQueryRequest.java | 6 +- .../transactions/IgniteTxLocalAdapter.java | 8 +- .../datastreamer/DataStreamProcessor.java | 22 +- .../internal/processors/igfs/IgfsContext.java | 4 +- .../processors/igfs/IgfsDataManager.java | 6 +- .../internal/processors/igfs/IgfsImpl.java | 2 +- .../internal/processors/odbc/OdbcProcessor.java | 1 + .../platform/compute/PlatformCompute.java | 6 + .../tcp/GridTcpMemcachedNioListener.java | 15 +- .../protocols/tcp/GridTcpRestNioListener.java | 2 +- .../rest/protocols/tcp/GridTcpRestProtocol.java | 1 + .../service/GridServiceProcessor.java | 6 +- .../ignite/internal/util/IgniteUtils.java | 62 +- .../ignite/internal/util/StripedExecutor.java | 667 +++++++++ .../util/future/GridFinishedFuture.java | 24 + .../internal/util/future/GridFutureAdapter.java | 15 +- .../util/future/GridFutureChainListener.java | 30 +- .../internal/util/ipc/IpcToNioAdapter.java | 2 +- .../nio/GridAbstractCommunicationClient.java | 12 +- .../util/nio/GridCommunicationClient.java | 9 +- .../nio/GridConnectionBytesVerifyFilter.java | 15 +- .../util/nio/GridNioAsyncNotifyFilter.java | 10 +- .../internal/util/nio/GridNioCodecFilter.java | 17 +- .../ignite/internal/util/nio/GridNioFilter.java | 16 +- .../internal/util/nio/GridNioFilterAdapter.java | 10 +- .../internal/util/nio/GridNioFilterChain.java | 14 +- .../ignite/internal/util/nio/GridNioFuture.java | 4 +- .../util/nio/GridNioRecoveryDescriptor.java | 124 +- .../ignite/internal/util/nio/GridNioServer.java | 1404 +++++++++++++++--- .../internal/util/nio/GridNioSession.java | 25 +- .../internal/util/nio/GridNioSessionImpl.java | 65 +- .../ignite/internal/util/nio/GridNioWorker.java | 48 + .../util/nio/GridSelectorNioSessionImpl.java | 221 ++- .../util/nio/GridShmemCommunicationClient.java | 7 +- .../util/nio/GridTcpNioCommunicationClient.java | 55 +- .../internal/util/nio/SessionWriteRequest.java | 85 ++ .../internal/util/nio/ssl/GridNioSslFilter.java | 10 +- .../util/nio/ssl/GridNioSslHandler.java | 4 +- .../util/tostring/GridToStringBuilder.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 1340 ++++++++++++----- .../tcp/TcpCommunicationSpiMBean.java | 40 + .../ignite/spi/discovery/tcp/ServerImpl.java | 14 +- .../ignite/stream/socket/SocketStreamer.java | 1 + .../ignite/thread/IgniteThreadFactory.java | 8 +- .../IgniteSlowClientDetectionSelfTest.java | 1 + ...unicationBalanceMultipleConnectionsTest.java | 28 + .../IgniteCommunicationBalanceTest.java | 339 +++++ .../communication/IgniteIoTestMessagesTest.java | 95 ++ .../IgniteVariousConnectionNumberTest.java | 166 +++ .../cache/CrossCacheTxRandomOperationsTest.java | 30 +- ...idAbstractCacheInterceptorRebalanceTest.java | 4 +- ...CacheOffHeapMultiThreadedUpdateSelfTest.java | 6 +- ...eAtomicMessageRecovery10ConnectionsTest.java | 28 + ...cMessageRecoveryNoPairedConnectionsTest.java | 47 + ...acheConnectionRecovery10ConnectionsTest.java | 35 + .../distributed/IgniteCacheCreatePutTest.java | 2 +- .../IgniteCacheMessageRecoveryAbstractTest.java | 24 +- .../IgniteCacheMessageWriteTimeoutTest.java | 17 +- .../dht/IgniteCacheMultiTxLockSelfTest.java | 6 +- ...erNoStripedPoolMultiNodeFullApiSelfTest.java | 35 + ...edNoStripedPoolMultiNodeFullApiSelfTest.java | 35 + .../TxDeadlockDetectionNoHangsTest.java | 2 +- .../TxOptimisticDeadlockDetectionTest.java | 29 +- .../GridServiceProcessorProxySelfTest.java | 2 +- .../util/future/GridFutureAdapterSelfTest.java | 122 +- .../nio/impl/GridNioFilterChainSelfTest.java | 32 +- .../loadtests/nio/GridNioBenchmarkClient.java | 4 +- .../p2p/GridP2PRecursionTaskSelfTest.java | 2 +- .../spi/GridTcpSpiForwardingSelfTest.java | 18 +- .../GridTcpCommunicationSpiAbstractTest.java | 28 +- ...mmunicationSpiConcurrentConnectSelfTest.java | 82 +- .../GridTcpCommunicationSpiConfigSelfTest.java | 5 +- ...cpCommunicationSpiMultithreadedSelfTest.java | 23 +- ...dTcpCommunicationSpiRecoveryAckSelfTest.java | 9 +- ...tionSpiRecoveryFailureDetectionSelfTest.java | 1 + ...ationSpiRecoveryNoPairedConnectionsTest.java | 28 + ...GridTcpCommunicationSpiRecoverySelfTest.java | 67 +- ...CommunicationRecoveryAckClosureSelfTest.java | 9 +- .../junits/GridTestKernalContext.java | 4 +- .../IgniteCacheFullApiSelfTestSuite.java | 6 + .../ignite/testsuites/IgniteCacheTestSuite.java | 17 +- .../IgniteSpiCommunicationSelfTestSuite.java | 2 + .../hadoop/jobtracker/HadoopJobTracker.java | 4 +- .../HadoopExternalCommunication.java | 5 +- .../communication/HadoopIpcToNioAdapter.java | 2 +- .../communication/HadoopMarshallerFilter.java | 6 +- .../ignite/stream/kafka/KafkaStreamer.java | 2 +- .../ignite/tools/classgen/ClassesGenerator.java | 8 +- .../ignite/yardstick/IgniteBenchmarkUtils.java | 6 +- .../yardstick/cache/CacheEntryEventProbe.java | 2 +- .../yardstick/cache/IgniteIoTestBenchmark.java | 73 + .../io/IgniteIoTestAbstractBenchmark.java | 61 + .../io/IgniteIoTestSendAllBenchmark.java | 32 + .../io/IgniteIoTestSendRandomBenchmark.java | 35 + 134 files changed, 5935 insertions(+), 998 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java index ad12297..dd8a72b 100644 --- a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java +++ b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java @@ -33,4 +33,4 @@ public class ExampleNodeStartup { public static void main(String[] args) throws IgniteException { Ignition.start("examples/config/example-ignite.xml"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java index 82a76b8..b9bae5b 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java @@ -105,4 +105,4 @@ public class CachePutGetExample { for (Map.Entry<Integer, String> e : vals.entrySet()) System.out.println("Got entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala index 1014726..814bb2e 100644 --- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala +++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala @@ -50,7 +50,7 @@ object ScalarJvmCloudExample { val pool = Executors.newFixedThreadPool(NODES.size) // Concurrently startup all nodes. - NODES.foreach(name => pool.submit(new Runnable { + NODES.foreach(name => pool.execute(new Runnable { @impl def run() { // All defaults. val cfg = new IgniteConfiguration http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java index 0f56c73..c03c06e 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java @@ -111,6 +111,8 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe Map<String, Object> map = client.getBulk("getKey1", "getKey2"); + info("Map: " + map); + Assert.assertEquals(2, map.size()); Assert.assertEquals("getVal1", map.get("getKey1")); @@ -443,4 +445,4 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe return res; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java index c82c73e..9bc4e7f 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.rest.protocols.tcp; import java.net.InetSocketAddress; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.nio.GridNioFinishedFuture; import org.apache.ignite.internal.util.nio.GridNioFuture; @@ -111,6 +112,11 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS } /** {@inheritDoc} */ + @Override public void sendNoFuture(Object msg) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ @Override public GridNioFuture<Object> resumeReads() { return null; } @@ -131,12 +137,27 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS } /** {@inheritDoc} */ - @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) { // No-op. } /** {@inheritDoc} */ - @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() { + @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() { return null; } + + /** {@inheritDoc} */ + @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() { + return null; + } + + /** {@inheritDoc} */ + @Override public void systemMessage(Object msg) { + // No-op. + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index d0c0d5e..9650a31 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -477,6 +477,9 @@ public final class IgniteSystemProperties { @Deprecated public static final String IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES = "IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES"; + /** */ + public static final String IGNITE_IO_BALANCE_PERIOD = "IGNITE_IO_BALANCE_PERIOD"; + /** * When set to {@code true} fields are written by BinaryMarshaller in sorted order. Otherwise * the natural order is used. http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java index 7494e37..d3f381e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java @@ -153,14 +153,14 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K buf.add(iter.next()); if (buf.size() == batchSize) { - exec.submit(new Worker(c, buf, args)); + exec.execute(new Worker(c, buf, args)); buf = new ArrayList<>(batchSize); } } if (!buf.isEmpty()) - exec.submit(new Worker(c, buf, args)); + exec.execute(new Worker(c, buf, args)); } catch (RejectedExecutionException ignored) { // Because of custom RejectedExecutionHandler. @@ -330,4 +330,4 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 75145a3..dcd8a80 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -146,7 +146,7 @@ public class IgniteConfiguration { public static final int AVAILABLE_PROC_CNT = Runtime.getRuntime().availableProcessors(); /** Default core size of public thread pool. */ - public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT) * 2; + public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT); /** Default keep alive time for public thread pool. */ @Deprecated @@ -236,6 +236,12 @@ public class IgniteConfiguration { /** Async Callback pool size. */ private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT; + /** + * Use striped pool for internal requests processing when possible + * (e.g. cache requests per-partition striping). + */ + private int stripedPoolSize = DFLT_PUBLIC_THREAD_CNT; + /** System pool size. */ private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT; @@ -553,6 +559,7 @@ public class IgniteConfiguration { sndRetryDelay = cfg.getNetworkSendRetryDelay(); sslCtxFactory = cfg.getSslContextFactory(); storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories(); + stripedPoolSize = cfg.getStripedPoolSize(); svcCfgs = cfg.getServiceConfiguration(); sysPoolSize = cfg.getSystemThreadPoolSize(); timeSrvPortBase = cfg.getTimeServerPortBase(); @@ -712,6 +719,47 @@ public class IgniteConfiguration { } /** + * Returns striped pool size that should be used for cache requests + * processing. + * <p> + * If set to non-positive value then requests get processed in system pool. + * <p> + * Striped pool is better for typical cache operations. + * + * @return Positive value if striped pool should be initialized + * with configured number of threads (stripes) and used for requests processing + * or non-positive value to process requests in system pool. + * + * @see #getPublicThreadPoolSize() + * @see #getSystemThreadPoolSize() + */ + public int getStripedPoolSize() { + return stripedPoolSize; + } + + /** + * Sets striped pool size that should be used for cache requests + * processing. + * <p> + * If set to non-positive value then requests get processed in system pool. + * <p> + * Striped pool is better for typical cache operations. + * + * @param stripedPoolSize Positive value if striped pool should be initialized + * with passed in number of threads (stripes) and used for requests processing + * or non-positive value to process requests in system pool. + * @return {@code this} for chaining. + * + * @see #getPublicThreadPoolSize() + * @see #getSystemThreadPoolSize() + */ + public IgniteConfiguration setStripedPoolSize(int stripedPoolSize) { + this.stripedPoolSize = stripedPoolSize; + + return this; + } + + /** * Should return a thread pool size to be used in grid. * This executor service will be in charge of processing {@link ComputeJob GridJobs} * and user messages sent to node. http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 715f8a5..68d34ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -181,7 +181,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { notificationQueue.add(new T3<>(nodeId, routineId, evt)); if (!notificationInProgress) { - ctx.getSystemExecutorService().submit(new Runnable() { + ctx.getSystemExecutorService().execute(new Runnable() { @Override public void run() { if (!ctx.continuous().lockStopping()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java index 804d228..dbfa0b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java @@ -217,7 +217,7 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable { assert execSvc != null; - execSvc.submit(new Runnable() { + execSvc.execute(new Runnable() { @Override public void run() { callcc0(); } @@ -300,4 +300,4 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable { @Override public String toString() { return S.toString(GridJobContextImpl.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index ae29223..927944f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.IgniteExceptionRegistry; +import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; @@ -511,6 +512,14 @@ public interface GridKernalContext extends Iterable<GridComponent> { public ExecutorService getSystemExecutorService(); /** + * Executor service that is in charge of processing internal system messages + * in stripes (dedicated threads). + * + * @return Thread pool implementation to be used in grid for internal system messages. + */ + public StripedExecutor getStripedExecutorService(); + + /** * Executor service that is in charge of processing internal and Visor * {@link org.apache.ignite.compute.ComputeJob GridJobs}. * http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 94c6448..a2ad1b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.IgniteExceptionRegistry; +import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.spring.IgniteSpringHelper; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -300,6 +301,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + protected StripedExecutor stripedExecSvc; + + /** */ + @GridToStringExclude private ExecutorService p2pExecSvc; /** */ @@ -381,6 +386,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param marshCachePool Marshaller cache pool. * @param execSvc Public executor service. * @param sysExecSvc System executor service. + * @param stripedExecSvc Striped executor. * @param p2pExecSvc P2P executor service. * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. @@ -400,6 +406,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService marshCachePool, ExecutorService execSvc, ExecutorService sysExecSvc, + StripedExecutor stripedExecSvc, ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, @@ -407,7 +414,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, - List<PluginProvider> plugins) throws IgniteCheckedException { + List<PluginProvider> plugins + ) throws IgniteCheckedException { assert grid != null; assert cfg != null; assert gw != null; @@ -419,6 +427,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable this.marshCachePool = marshCachePool; this.execSvc = execSvc; this.sysExecSvc = sysExecSvc; + this.stripedExecSvc = stripedExecSvc; this.p2pExecSvc = p2pExecSvc; this.mgmtExecSvc = mgmtExecSvc; this.igfsExecSvc = igfsExecSvc; @@ -948,6 +957,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public StripedExecutor getStripedExecutorService() { + return stripedExecSvc; + } + + /** {@inheritDoc} */ @Override public ExecutorService getManagementExecutorService() { return mgmtExecSvc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java index b040a97..5e8e520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java @@ -89,4 +89,4 @@ public class GridPerformanceSuggestions { @Override public String toString() { return S.toString(GridPerformanceSuggestions.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index b5608db..24ddcd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -97,7 +97,10 @@ public enum GridTopic { TOPIC_QUERY, /** */ - TOPIC_TX; + TOPIC_TX, + + /** */ + TOPIC_IO_TEST; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java index b80a755..789556d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal; +import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; @@ -133,6 +134,16 @@ public interface IgniteInternalFuture<R> { public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb); /** + * Make a chained future to convert result of this future (when complete) into a new format. + * It is guaranteed that done callback will be called only ONCE. + * + * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result. + * @param exec Executor to run callback. + * @return Chained future that finishes after this future completes and done callback is called. + */ + public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, Executor exec); + + /** * @return Error value if future has already been completed with error. */ public Throwable error(); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 57aab00..7935e06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -61,10 +61,10 @@ import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteEvents; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteFileSystem; +import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.IgniteQueue; -import org.apache.ignite.IgniteLock; import org.apache.ignite.IgniteScheduler; import org.apache.ignite.IgniteSemaphore; import org.apache.ignite.IgniteServices; @@ -115,7 +115,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.hadoop.Hadoop; -import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; @@ -139,6 +138,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor; import org.apache.ignite.internal.processors.task.GridTaskProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -175,6 +175,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON; import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII; @@ -182,7 +183,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALL import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK; import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.IgniteSystemProperties.snapshot; import static org.apache.ignite.internal.GridKernalState.DISCONNECTED; @@ -199,7 +199,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; @@ -208,11 +207,12 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JMX_PORT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_ARGS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_PID; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LANG_RUNTIME; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER; -import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM; @@ -663,6 +663,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @param utilityCachePool Utility cache pool. * @param execSvc Executor service. * @param sysExecSvc System executor service. + * @param stripedExecSvc Striped executor. * @param p2pExecSvc P2P executor service. * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. @@ -673,11 +674,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { * @throws IgniteCheckedException Thrown in case of any errors. */ @SuppressWarnings({"CatchGenericClass", "unchecked"}) - public void start(final IgniteConfiguration cfg, + public void start( + final IgniteConfiguration cfg, ExecutorService utilityCachePool, ExecutorService marshCachePool, final ExecutorService execSvc, final ExecutorService sysExecSvc, + final StripedExecutor stripedExecSvc, ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, @@ -685,7 +688,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, IgniteStripedThreadPoolExecutor callbackExecSvc, - GridAbsClosure errHnd) + GridAbsClosure errHnd + ) throws IgniteCheckedException { gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getGridName())); @@ -785,6 +789,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { marshCachePool, execSvc, sysExecSvc, + stripedExecSvc, p2pExecSvc, mgmtExecSvc, igfsExecSvc, @@ -792,7 +797,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { affExecSvc, idxExecSvc, callbackExecSvc, - plugins); + plugins + ); cfg.getMarshaller().setContext(ctx.marshallerContext()); @@ -986,24 +992,51 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { starveTask = ctx.timeout().schedule(new Runnable() { /** Last completed task count. */ - private long lastCompletedCnt; + private long lastCompletedCntPub; + + /** Last completed task count. */ + private long lastCompletedCntSys; @Override public void run() { - if (!(execSvc instanceof ThreadPoolExecutor)) - return; + if (execSvc instanceof ThreadPoolExecutor) { + ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; + + lastCompletedCntPub = checkPoolStarvation(exec, lastCompletedCntPub, "public"); + } - ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc; + if (sysExecSvc instanceof ThreadPoolExecutor) { + ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc; + lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system"); + } + + if (stripedExecSvc != null) + stripedExecSvc.checkStarvation(); + } + + /** + * @param exec Thread pool executor to check. + * @param lastCompletedCnt Last completed tasks count. + * @param pool Pool name for message. + * @return Current completed tasks count. + */ + private long checkPoolStarvation( + ThreadPoolExecutor exec, + long lastCompletedCnt, + String pool + ) { long completedCnt = exec.getCompletedTaskCount(); // If all threads are active and no task has completed since last time and there is // at least one waiting request, then it is possible starvation. if (exec.getPoolSize() == exec.getActiveCount() && completedCnt == lastCompletedCnt && !exec.getQueue().isEmpty()) - LT.warn(log, "Possible thread pool starvation detected (no task completed in last " + - interval + "ms, is executorService pool size large enough?)"); + LT.warn( + log, + "Possible thread pool starvation detected (no task completed in last " + + interval + "ms, is " + pool + " thread pool size large enough?)"); - lastCompletedCnt = completedCnt; + return completedCnt; } }, interval, interval); } @@ -1128,6 +1161,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { }, longOpDumpTimeout, longOpDumpTimeout); } + ctx.performance().add("Disable assertions (remove '-ea' from JVM options)", !U.assertionsEnabled()); + ctx.performance().logSuggestions(log, gridName); U.quietAndInfo(log, "To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}"); @@ -3509,6 +3544,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + /** + * @param node Node. + * @param payload Message payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) { + return ctx.io().sendIoTest(node, payload, procFromNioThread); + } + + /** + * @param nodes Nodes. + * @param payload Message payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) { + return ctx.io().sendIoTest(nodes, payload, procFromNioThread); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteKernal.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index b3a9eec..f32a753 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsUtils; import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.spring.IgniteSpringHelper; import org.apache.ignite.internal.util.typedef.CA; import org.apache.ignite.internal.util.typedef.F; @@ -1459,6 +1460,9 @@ public class IgnitionEx { /** System executor service. */ private ThreadPoolExecutor sysExecSvc; + /** */ + private StripedExecutor stripedExecSvc; + /** Management executor service. */ private ThreadPoolExecutor mgmtExecSvc; @@ -1652,8 +1656,6 @@ public class IgnitionEx { execSvc.allowCoreThreadTimeOut(true); - // Note that since we use 'LinkedBlockingQueue', number of - // maximum threads has no effect. validateThreadPoolSize(cfg.getSystemThreadPoolSize(), "system"); sysExecSvc = new IgniteThreadPoolExecutor( @@ -1666,6 +1668,9 @@ public class IgnitionEx { sysExecSvc.allowCoreThreadTimeOut(true); + if (cfg.getStripedPoolSize() > 0) + stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getGridName(), "sys", log); + // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. // Note, that we do not pre-start threads here as management pool may @@ -1791,13 +1796,26 @@ public class IgnitionEx { // Init here to make grid available to lifecycle listeners. grid = grid0; - grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, - igfsExecSvc, restExecSvc, affExecSvc, idxExecSvc, callbackExecSvc, + grid0.start( + myCfg, + utilityCacheExecSvc, + marshCacheExecSvc, + execSvc, + sysExecSvc, + stripedExecSvc, + p2pExecSvc, + mgmtExecSvc, + igfsExecSvc, + restExecSvc, + affExecSvc, + idxExecSvc, + callbackExecSvc, new CA() { @Override public void apply() { startLatch.countDown(); } - }); + } + ); state = STARTED; @@ -2415,6 +2433,10 @@ public class IgnitionEx { sysExecSvc = null; + U.shutdownNow(getClass(), stripedExecSvc, log); + + stripedExecSvc = null; + U.shutdownNow(getClass(), mgmtExecSvc, log); mgmtExecSvc = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java index 6ea7c22..12baee0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java @@ -200,6 +200,7 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo .socketSendBufferSize(0) .idleTimeout(Long.MAX_VALUE) .gridName(routerClient ? "routerClient" : "gridClient") + .serverName("tcp-client") .daemon(cfg.isDaemon()) .build(); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java index 06a4929..3566830 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java @@ -258,6 +258,7 @@ public class GridTcpRouterImpl implements GridTcpRouter, GridTcpRouterMBean, Lif .logger(log) .selectorCount(Runtime.getRuntime().availableProcessors()) .gridName(gridName) + .serverName("router") .tcpNoDelay(tcpNoDelay) .directBuffer(false) .byteOrder(ByteOrder.nativeOrder()) http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3df29cf..7ef7bc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -26,15 +26,17 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -44,6 +46,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.GridManagerAdapter; @@ -55,6 +58,8 @@ import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -83,6 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; +import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; @@ -176,6 +182,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Stopping flag. */ private boolean stopping; + /** */ + private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>(); + + /** */ + private final AtomicLong ioTestId = new AtomicLong(); + /** * @param ctx Grid kernal context. */ @@ -297,6 +309,114 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (log.isDebugEnabled()) log.debug(startInfo()); + + addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + return; + + IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg; + + if (msg0.request()) { + IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null); + + res.flags(msg0.flags()); + + try { + send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e); + } + } + else { + IoTestFuture fut = ioTestMap().get(msg0.id()); + + if (fut == null) + U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']'); + else + fut.onResponse(); + } + } + }); + } + + /** + * @param nodes Nodes. + * @param payload Payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) { + long id = ioTestId.getAndIncrement(); + + IoTestFuture fut = new IoTestFuture(id, nodes.size()); + + IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload); + + msg.processFromNioThread(procFromNioThread); + + ioTestMap().put(id, fut); + + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + try { + send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + ioTestMap().remove(msg.id()); + + return new GridFinishedFuture(e); + } + } + + return fut; + } + + /** + * @param node Node. + * @param payload Payload. + * @param procFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) { + long id = ioTestId.getAndIncrement(); + + IoTestFuture fut = new IoTestFuture(id, 1); + + IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload); + + msg.processFromNioThread(procFromNioThread); + + ioTestMap().put(id, fut); + + try { + send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + ioTestMap().remove(msg.id()); + + return new GridFinishedFuture(e); + } + + return fut; + } + + /** + * @return IO test futures map. + */ + private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() { + ConcurrentHashMap<Long, IoTestFuture> map = ioTestMap.get(); + + if (map == null) { + if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>())) + map = ioTestMap.get(); + } + + return map; } /** {@inheritDoc} */ @@ -514,16 +634,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } - // Check discovery. - ClusterNode node = ctx.discovery().node(nodeId); - - if (node == null) { - if (log.isDebugEnabled()) - log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']'); - - return; // We can't receive messages from non-discovered ones. - } - if (msg.topic() == null) { int topicOrd = msg.topicOrdinal(); @@ -678,8 +788,31 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa msgC.run(); } } + + @Override public String toString() { + return "Message closure [msg=" + msg + ']'; + } }; + if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) { + IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message(); + + if (msg0.processFromNioThread()) { + c.run(); + + return; + } + } + + if (ctx.config().getStripedPoolSize() > 0 && + plc == GridIoPolicy.SYSTEM_POOL && + msg.partition() != Integer.MIN_VALUE + ) { + ctx.getStripedExecutorService().execute(msg.partition(), c); + + return; + } + try { pools.poolForPolicy(plc).execute(c); } @@ -2460,4 +2593,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return S.toString(DelayedMessage.class, this, super.toString()); } } + + /** + * + */ + private class IoTestFuture extends GridFutureAdapter<Object> { + /** */ + private final long id; + + /** */ + private int cntr; + + /** + * @param id ID. + * @param cntr Counter. + */ + IoTestFuture(long id, int cntr) { + assert cntr > 0 : cntr; + + this.id = id; + this.cntr = cntr; + } + + /** + * + */ + void onResponse() { + boolean complete; + + synchronized (this) { + complete = --cntr == 0; + } + + if (complete) + onDone(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + ioTestMap().remove(id); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IoTestFuture.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index b28ced2..b1a26e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.communication; import java.io.Externalizable; import java.nio.ByteBuffer; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -321,6 +322,18 @@ public class GridIoMessage implements Message { return 7; } + /** + * Get single partition for this message (if applicable). + * + * @return Partition ID. + */ + public int partition() { + if (msg instanceof GridCacheMessage) + return ((GridCacheMessage)msg).partition(); + else + return Integer.MIN_VALUE; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridIoMessage.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 504e683..b1fe910 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -171,6 +171,16 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -44: + msg = new TcpCommunicationSpi.HandshakeMessage2(); + + break; + + case -43: + msg = new IgniteIoTestMessage(); + + break; + case -42: msg = new HadoopDirectShuffleMessage(); @@ -816,7 +826,7 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124..127] [-36]- this + // [-3..119] [124..127] [-36..-44]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java new file mode 100644 index 0000000..77aaa09 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; + +/** + * + */ +public class IgniteIoTestMessage implements Message { + /** */ + private static byte FLAG_PROC_FROM_NIO = 1; + + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long id; + + /** */ + private byte flags; + + /** */ + private boolean req; + + /** */ + private byte payload[]; + + /** + * + */ + IgniteIoTestMessage() { + // No-op. + } + + /** + * @param id Message ID. + * @param req Request flag. + * @param payload Payload. + */ + IgniteIoTestMessage(long id, boolean req, byte[] payload) { + this.id = id; + this.req = req; + this.payload = payload; + } + + /** + * @return {@code True} if message should be processed from NIO thread + * (otherwise message is submitted to system pool). + */ + boolean processFromNioThread() { + return isFlag(FLAG_PROC_FROM_NIO); + } + + /** + * @param procFromNioThread {@code True} if message should be processed from NIO thread. + */ + void processFromNioThread(boolean procFromNioThread) { + setFlag(procFromNioThread, FLAG_PROC_FROM_NIO); + } + + /** + * @param flags Flags. + */ + public void flags(byte flags) { + this.flags = flags; + } + + /** + * @return Flags. + */ + public byte flags() { + return flags; + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** + * @return {@code true} if this is request. + */ + public boolean request() { + return req; + } + + /** + * @return ID. + */ + public long id() { + return id; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("id", id)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeByteArray("payload", payload)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeBoolean("req", req)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + id = reader.readLong("id"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + payload = reader.readByteArray("payload"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + req = reader.readBoolean("req"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(IgniteIoTestMessage.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -43; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteIoTestMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 2e24e67..a8d9f1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -288,6 +288,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Asynchronous operations limit semaphore. */ private Semaphore asyncOpsSem; + /** */ + protected volatile boolean asyncToggled; + /** {@inheritDoc} */ @Override public String name() { return cacheCfg.getName(); @@ -364,6 +367,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Toggles async flag if someone calls {@code withAsync()} + * on proxy and since that we have to properly handle all cache + * operations (sync and async) to put them in proper sequence. + * + * TODO: https://issues.apache.org/jira/browse/IGNITE-4393 + */ + void toggleAsync() { + if (!asyncToggled) + asyncToggled = true; + } + + /** * Prints memory stats. */ public void printMemoryStats() { @@ -1134,7 +1149,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V execSvc = Executors.newFixedThreadPool(jobs.size() - 1); for (int i = 1; i < jobs.size(); i++) - execSvc.submit(jobs.get(i)); + execSvc.execute(jobs.get(i)); } try { @@ -2534,6 +2549,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Put future. */ public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) { + A.notNull(key, "key", val, "val"); + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -2554,8 +2571,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val, @Nullable final CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - if (keyCheck) validateCacheKey(key); @@ -4592,6 +4607,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Failed future if waiting was interrupted. */ @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() { + if (!asyncToggled) + return null; + try { if (asyncOpsSem != null) asyncOpsSem.acquire(); @@ -4610,7 +4628,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Releases asynchronous operations permit, if limited. */ protected void asyncOpRelease() { - if (asyncOpsSem != null) + if (asyncOpsSem != null && asyncToggled) asyncOpsSem.release(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index 71f99d3..0646d5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -129,6 +129,13 @@ public abstract class GridCacheMessage implements Message { } /** + * @return Partition ID this message is targeted to or {@code -1} if it cannot be determined. + */ + public int partition() { + return -1; + } + + /** * If class loading error occurred during unmarshalling and {@link #ignoreClassErrors()} is * set to {@code true}, then the error will be passed into this method. * http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 90e428c..3178203 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -110,6 +110,41 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA * Cache utility methods. */ public class GridCacheUtils { + /** Cheat cache ID for debugging and benchmarking purposes. */ + public static final int cheatCacheId; + + /* + * + */ + static { + String cheatCache = System.getProperty("CHEAT_CACHE"); + + if (cheatCache != null) { + cheatCacheId = cheatCache.hashCode(); + + if (cheatCacheId == 0) + throw new RuntimeException(); + + System.out.println(">>> Cheat cache ID [id=" + cheatCacheId + ", name=" + cheatCache + ']'); + } + else + cheatCacheId = 0; + } + + /** + * Quickly checks if passed in cache ID is a "cheat cache ID" set by -DCHEAT_CACHE=user_cache_name + * and resolved in static block above. + * + * FOR DEBUGGING AND TESTING PURPOSES! + * + * @param id Cache ID to check. + * @return {@code True} if this is cheat cache ID. + */ + @Deprecated + public static boolean cheatCache(int id) { + return cheatCacheId != 0 && id == cheatCacheId; + } + /** Hadoop syste cache name. */ public static final String SYS_CACHE_HADOOP_MR = "ignite-hadoop-mr-sys-cache"; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f87fa1d..b9e6e82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -334,6 +334,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withAsync() { + if (delegate instanceof GridCacheAdapter) + ((GridCacheAdapter)delegate).toggleAsync(); + + return super.withAsync(); + } + + /** {@inheritDoc} */ @Override public IgniteCache<K, V> withSkipStore() { return skipStore(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 9639a9a..a671296 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -310,6 +310,11 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { return keys; } + /** {@inheritDoc} */ + @Override public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + /** * @return Max lock wait time. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java index 109d665..c5cf332 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java @@ -86,6 +86,12 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public int partition() { + // TODO https://issues.apache.org/jira/browse/IGNITE-4371 + return Integer.MIN_VALUE; + } + + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java index df6acdd..5d70ec1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java @@ -89,6 +89,11 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { partIds.add(key.partition()); } + /** {@inheritDoc} */ + @Override public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + /** {@inheritDoc} * @param ctx*/ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 35e6267..519d0fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -218,7 +218,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public void onKernalStart() throws IgniteCheckedException { super.onKernalStart(); - preldr.onKernalStart(); + if (preldr != null) + preldr.onKernalStart(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java index 1e92b54..63e3309 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; @@ -31,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -57,7 +57,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse { /** Invalid partitions. */ @GridToStringInclude @GridDirectCollection(int.class) - private Collection<Integer> invalidParts = new GridLeanSet<>(); + private Collection<Integer> invalidParts; /** Preload entries. */ @GridDirectCollection(GridCacheEntryInfo.class) @@ -127,6 +127,9 @@ public class GridDhtLockResponse extends GridDistributedLockResponse { * @param part Invalid partition. */ public void addInvalidPartition(int part) { + if (invalidParts == null) + invalidParts = new HashSet<>(); + invalidParts.add(part); } @@ -134,7 +137,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse { * @return Invalid partitions. */ public Collection<Integer> invalidPartitions() { - return invalidParts; + return invalidParts == null ? Collections.<Integer>emptySet() : invalidParts; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 940c74e..0e60ff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -613,8 +613,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - return updateAsync0( key, val, @@ -814,6 +812,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { + if (!asyncToggled) + return op.apply(); + IgniteInternalFuture<T> fail = asyncOpAcquire(); if (fail != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index a03d948..0af7cf5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -235,6 +235,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + + /** {@inheritDoc} */ @Override public int partitionId(int idx) { assert idx == 0 : idx; http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index f2fbb0e..1854e52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -487,6 +487,11 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque } /** {@inheritDoc} */ + @Override public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + + /** {@inheritDoc} */ @Override public long conflictExpireTime(int idx) { if (conflictExpireTimes != null) { assert idx >= 0 && idx < conflictExpireTimes.size();
