IGNITE-5658 Optimizations for data streamer (cherry picked from commit aa81dd1)
(cherry picked from commit 1ad4f14) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/212603e1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/212603e1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/212603e1 Branch: refs/heads/ignite-2.1.5-p1 Commit: 212603e1a969b9320f40207cd9233bed9152b3e4 Parents: 98afbfd Author: Igor Seliverstov <gvvinbl...@gmail.com> Authored: Wed Aug 9 19:29:39 2017 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Fri Sep 22 15:28:11 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteDataStreamer.java | 26 +- .../apache/ignite/IgniteSystemProperties.java | 12 +- .../ignite/internal/GridKernalContext.java | 2 +- .../ignite/internal/GridKernalContextImpl.java | 6 +- .../apache/ignite/internal/IgniteKernal.java | 5 +- .../ignite/internal/IgniteNodeAttributes.java | 3 + .../org/apache/ignite/internal/IgnitionEx.java | 23 +- .../ignite/internal/jdbc2/JdbcConnection.java | 5 +- .../managers/communication/GridIoManager.java | 6 + .../managers/communication/GridIoMessage.java | 3 + .../cache/persistence/freelist/PagesList.java | 67 +++-- .../wal/reader/StandaloneGridKernalContext.java | 2 +- .../datastreamer/DataStreamerImpl.java | 281 ++++++++++++------- .../datastreamer/DataStreamerRequest.java | 59 +++- .../ignite/internal/util/StripedExecutor.java | 89 +++++- .../org/apache/ignite/thread/IgniteThread.java | 10 + .../DataStreamProcessorSelfTest.java | 14 +- .../datastreamer/DataStreamerImplSelfTest.java | 3 +- 18 files changed, 424 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index e2473dc..b1f5851 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -20,6 +20,7 @@ package org.apache.ignite; import java.util.Collection; import java.util.Map; import javax.cache.CacheException; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.stream.StreamReceiver; @@ -72,7 +73,7 @@ import org.jetbrains.annotations.Nullable; * this setting limits maximum allowed number of parallel buffered stream messages that * are being processed on remote nodes. If this number is exceeded, then * {@link #addData(Object, Object)} method will block to control memory utilization. - * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value. + * Default is equal to CPU count on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}. * </li> * <li> * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially, @@ -100,11 +101,23 @@ import org.jetbrains.annotations.Nullable; * </ul> */ public interface IgniteDataStreamer<K, V> extends AutoCloseable { - /** Default max concurrent put operations count. */ + /** + * Default max concurrent put operations count. + * @deprecated Is not used anymore. + */ + @Deprecated public static final int DFLT_MAX_PARALLEL_OPS = 16; - /** Default per node buffer size. */ - public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024; + /** + * Default multiplier for data streamer pool size to get concurrent batches count for each remote node. + * + * @see IgniteConfiguration#getDataStreamerThreadPoolSize() + * @see #perNodeParallelOperations() + */ + public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 8; + + /** Default operations batch size to sent to remote node for loading. */ + public static final int DFLT_PER_NODE_BUFFER_SIZE = 512; /** Default timeout for streamer's operations. */ public static final long DFLT_UNLIMIT_TIMEOUT = -1; @@ -203,9 +216,11 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { * <p> * This method should be called prior to {@link #addData(Object, Object)} call. * <p> - * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}. + * If not provided, default value is calculated as follows + * {@link #DFLT_PARALLEL_OPS_MULTIPLIER} * {@code DATA_STREAMER_POOL_SIZE_ON_REMOTE_NODE}. * * @param parallelOps Maximum number of parallel stream operations for a single node. + * @see IgniteConfiguration#getDataStreamerThreadPoolSize() */ public void perNodeParallelOperations(int parallelOps); @@ -450,5 +465,4 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable { */ @Override public void close() throws CacheException, IgniteInterruptedException, IgniteDataStreamerTimeoutException; - } http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index f627e24..ec79026 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Properties; import javax.net.ssl.HostnameVerifier; import org.apache.ignite.cluster.ClusterGroup; -import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -707,9 +706,12 @@ public final class IgniteSystemProperties { public static final String IGNITE_ENABLE_FORCIBLE_NODE_KILL = "IGNITE_ENABLE_FORCIBLE_NODE_KILL"; /** - * If this property is set, then Ignite will use Async File IO factory by default. + * Tasks stealing will be started if tasks queue size per data-streamer thread exceeds this threshold. + * <p> + * Default value is {@code 4}. */ - public static final String IGNITE_USE_ASYNC_FILE_IO_FACTORY = "IGNITE_USE_ASYNC_FILE_IO_FACTORY"; + public static final String IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD = + "IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD"; /** * If the property is set {@link org.apache.ignite.internal.pagemem.wal.record.TxRecord} records @@ -719,10 +721,6 @@ public final class IgniteSystemProperties { */ public static final String IGNITE_WAL_LOG_TX_RECORDS = "IGNITE_WAL_LOG_TX_RECORDS"; - /** If this property is set, {@link PersistentStoreConfiguration#writeThrottlingEnabled} will be overridden to true - * independent of initial value in configuration. */ - public static final String IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED = "IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED"; - /** * Enforces singleton. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 93ae465..99c7cce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -531,7 +531,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { * * @return Thread pool implementation to be used for data stream messages. */ - public ExecutorService getDataStreamerExecutorService(); + public StripedExecutor getDataStreamerExecutorService(); /** * Should return an instance of fully configured thread pool to be used for http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 0c80eae..07e5970 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -316,7 +316,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude - private ExecutorService dataStreamExecSvc; + private StripedExecutor dataStreamExecSvc; /** */ @GridToStringExclude @@ -422,7 +422,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, - ExecutorService dataStreamExecSvc, + StripedExecutor dataStreamExecSvc, ExecutorService restExecSvc, ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, @@ -975,7 +975,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public ExecutorService getDataStreamerExecutorService() { + @Override public StripedExecutor getDataStreamerExecutorService() { return dataStreamExecSvc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b7430da..3ed6447 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -213,6 +213,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS; @@ -733,7 +734,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ExecutorService p2pExecSvc, ExecutorService mgmtExecSvc, ExecutorService igfsExecSvc, - ExecutorService dataStreamExecSvc, + StripedExecutor dataStreamExecSvc, ExecutorService restExecSvc, ExecutorService affExecSvc, @Nullable ExecutorService idxExecSvc, @@ -1457,6 +1458,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { */ @SuppressWarnings({"SuspiciousMethodCalls", "unchecked", "TypeMayBeWeakened"}) private void fillNodeAttributes(boolean notifyEnabled) throws IgniteCheckedException { + ctx.addNodeAttribute(ATTR_DATA_STREAMER_POOL_SIZE, configuration().getDataStreamerThreadPoolSize()); + final String[] incProps = cfg.getIncludeProperties(); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index e4ed44a..024f339 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -180,6 +180,9 @@ public final class IgniteNodeAttributes { /** Ignite security compatibility mode. */ public static final String ATTR_SECURITY_COMPATIBILITY_MODE = ATTR_PREFIX + ".security.compatibility.enabled"; + /** */ + public static final String ATTR_DATA_STREAMER_POOL_SIZE = ATTR_PREFIX + ".data.streamer.pool.size"; + /** Memory configuration. */ public static final String ATTR_MEMORY_CONFIG = ATTR_PREFIX + ".memory"; http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index 23baeb3..07a5c43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@ -1508,7 +1508,7 @@ public class IgnitionEx { private ThreadPoolExecutor igfsExecSvc; /** Data streamer executor service. */ - private ThreadPoolExecutor dataStreamerExecSvc; + private StripedExecutor dataStreamerExecSvc; /** REST requests executor service. */ private ThreadPoolExecutor restExecSvc; @@ -1728,7 +1728,11 @@ public class IgnitionEx { validateThreadPoolSize(cfg.getStripedPoolSize(), "stripedPool"); - stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getIgniteInstanceName(), "sys", log); + stripedExecSvc = new StripedExecutor( + cfg.getStripedPoolSize(), + cfg.getIgniteInstanceName(), + "sys", + log); // Note that since we use 'LinkedBlockingQueue', number of // maximum threads has no effect. @@ -1763,17 +1767,12 @@ public class IgnitionEx { p2pExecSvc.allowCoreThreadTimeOut(true); - // Note that we do not pre-start threads here as this pool may not be needed. - dataStreamerExecSvc = new IgniteThreadPoolExecutor( - "data-streamer", - cfg.getIgniteInstanceName(), - cfg.getDataStreamerThreadPoolSize(), + dataStreamerExecSvc = new StripedExecutor( cfg.getDataStreamerThreadPoolSize(), - DFLT_THREAD_KEEP_ALIVE_TIME, - new LinkedBlockingQueue<Runnable>(), - GridIoPolicy.DATA_STREAMER_POOL); - - dataStreamerExecSvc.allowCoreThreadTimeOut(true); + cfg.getIgniteInstanceName(), + "data-streamer", + log, + true); // Note that we do not pre-start threads here as igfs pool may not be needed. validateThreadPoolSize(cfg.getIgfsThreadPoolSize(), "IGFS"); http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index 6c8fc0b..6b8371d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -199,8 +199,9 @@ public class JdbcConnection implements Connection { streamFlushTimeout = Long.parseLong(props.getProperty(PROP_STREAMING_FLUSH_FREQ, "0")); streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE, String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE))); - streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, - String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS))); + // If value is zero, server data-streamer pool size multiplied + // by IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER will be used + streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, "0")); String nodeIdProp = props.getProperty(PROP_NODE_ID); http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index bb36b26..2005032 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1125,6 +1125,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return; } + if (plc == GridIoPolicy.DATA_STREAMER_POOL && msg.partition() != GridIoMessage.STRIPE_DISABLED_PART) { + ctx.getDataStreamerExecutorService().execute(msg.partition(), c); + + return; + } + if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) { IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message(); http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index dccd336..fe61aec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.internal.ExecutorAwareMessage; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -336,6 +337,8 @@ public class GridIoMessage implements Message { public int partition() { if (msg instanceof GridCacheMessage) return ((GridCacheMessage)msg).partition(); + if (msg instanceof DataStreamerRequest) + return ((DataStreamerRequest)msg).partition(); else return STRIPE_DISABLED_PART; } http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java index 39a6865..6c355f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; import static java.lang.Boolean.FALSE; @@ -66,7 +67,7 @@ public abstract class PagesList extends DataStructure { /** */ private static final int MAX_STRIPES_PER_BUCKET = IgniteSystemProperties.getInteger("IGNITE_PAGES_LIST_STRIPES_PER_BUCKET", - Math.min(8, Runtime.getRuntime().availableProcessors() * 2)); + Math.max(8, Runtime.getRuntime().availableProcessors())); /** */ protected final AtomicLong[] bucketsSize; @@ -507,6 +508,21 @@ public abstract class PagesList extends DataStructure { * @throws IgniteCheckedException If failed. */ private Stripe getPageForPut(int bucket) throws IgniteCheckedException { + // Striped pool optimization. + int stripeIdx; IgniteThread igniteThread = IgniteThread.current(); + + if (igniteThread != null && (stripeIdx = igniteThread.stripe()) != -1) { + Stripe[] tails = getBucket(bucket); + + while (tails == null || stripeIdx >= tails.length) { + addStripe(bucket, true); + + tails = getBucket(bucket); + } + + return tails[stripeIdx]; + } + Stripe[] tails = getBucket(bucket); if (tails == null) @@ -607,12 +623,8 @@ public abstract class PagesList extends DataStructure { try { long tailAddr = writeLockPage(tailId, tailPage, bucket, lockAttempt++); // Explicit check. - if (tailAddr == 0L) { - if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS) - addStripeForReuseBucket(bucket); - + if (tailAddr == 0L) continue; - } assert PageIO.getPageId(tailAddr) == tailId : "pageId = " + PageIO.getPageId(tailAddr) + ", tailId = " + tailId; assert PageIO.getType(tailAddr) == PageIO.T_PAGE_LIST_NODE; @@ -912,13 +924,26 @@ public abstract class PagesList extends DataStructure { * @param bucket Bucket index. * @return Page for take. */ - private Stripe getPageForTake(int bucket) { + private Stripe getPageForTake(int bucket) throws IgniteCheckedException { Stripe[] tails = getBucket(bucket); if (tails == null || bucketsSize[bucket].get() == 0) return null; int len = tails.length; + + // Striped pool optimization. + int stripeIdx; IgniteThread igniteThread = IgniteThread.current(); + + if (igniteThread != null && (stripeIdx = igniteThread.stripe()) != -1) { + if (stripeIdx >= len) + return null; + + Stripe stripe = tails[stripeIdx]; + + return stripe.empty ? null : stripe; + } + int init = randomInt(len); int cur = init; @@ -943,6 +968,12 @@ public abstract class PagesList extends DataStructure { */ private long writeLockPage(long pageId, long page, int bucket, int lockAttempt) throws IgniteCheckedException { + // Striped pool optimization. + IgniteThread igniteThread = IgniteThread.current(); + + if (igniteThread != null && igniteThread.stripe() != -1) + return writeLock(pageId, page); + long pageAddr = tryWriteLock(pageId, page); if (pageAddr != 0L) @@ -952,8 +983,7 @@ public abstract class PagesList extends DataStructure { Stripe[] stripes = getBucket(bucket); if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET) { - if (!isReuseBucket(bucket)) - addStripe(bucket, true); + addStripe(bucket, !isReuseBucket(bucket)); return 0L; } @@ -963,19 +993,6 @@ public abstract class PagesList extends DataStructure { } /** - * @param bucket Bucket. - * @throws IgniteCheckedException If failed. - */ - private void addStripeForReuseBucket(int bucket) throws IgniteCheckedException { - assert isReuseBucket(bucket); - - Stripe[] stripes = getBucket(bucket); - - if (stripes == null || stripes.length < MAX_STRIPES_PER_BUCKET) - addStripe(bucket, false); - } - - /** * @param bucket Bucket index. * @param initIoVers Optional IO to initialize page. * @return Removed page ID. @@ -994,12 +1011,8 @@ public abstract class PagesList extends DataStructure { try { long tailAddr = writeLockPage(tailId, tailPage, bucket, lockAttempt++); // Explicit check. - if (tailAddr == 0L) { - if (isReuseBucket(bucket) && lockAttempt == TRY_LOCK_ATTEMPTS) - addStripeForReuseBucket(bucket); - + if (tailAddr == 0L) continue; - } if (stripe.empty) { // Another thread took the last page. http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 9dfd338..07be8b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -513,7 +513,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ - @Override public ExecutorService getDataStreamerExecutorService() { + @Override public StripedExecutor getDataStreamerExecutorService() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 6681710..1869dcf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.datastreamer; +import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -60,8 +61,10 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -152,7 +155,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private int bufSize = DFLT_PER_NODE_BUFFER_SIZE; /** */ - private int parallelOps = DFLT_MAX_PARALLEL_OPS; + private int parallelOps; /** */ private long timeout = DFLT_UNLIMIT_TIMEOUT; @@ -794,6 +797,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed initPda = false; } + if (key.partition() == -1) + key.partition(cctx.affinity().partition(key, false)); + nodes = nodes(key, topVer, cctx); } catch (IgniteCheckedException e) { @@ -935,10 +941,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } }; - final GridFutureAdapter<?> f; + final List<GridFutureAdapter<?>> futs; try { - f = buf.update(entriesForNode, topVer, lsnr, remap); + futs = buf.update(entriesForNode, topVer, lsnr, remap); } catch (IgniteInterruptedCheckedException e1) { resFut.onDone(e1); @@ -954,9 +960,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed @Override public void run() { buf0.onNodeLeft(); - if (f != null) - f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " + - "(node has left): " + nodeId)); + if (futs != null) { + Throwable ex = new ClusterTopologyCheckedException( + "Failed to wait for request completion (node has left): " + nodeId); + + for (int i = 0; i < futs.size(); i++) + futs.get(i).onDone(ex); + } } }, ctx.discovery().topologyVersion(), false); } @@ -1314,11 +1324,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private final Collection<IgniteInternalFuture<Object>> locFuts; /** Buffered entries. */ - private List<DataStreamerEntry> entries; - - /** */ - @GridToStringExclude - private GridFutureAdapter<Object> curFut; + private final PerStripeBuffer[] stripes; /** Local node flag. */ private final boolean isLocNode; @@ -1332,16 +1338,17 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** */ private final Semaphore sem; - /** Batch topology. */ - private AffinityTopologyVersion batchTopVer; + /** */ + private final int perNodeParallelOps; /** Closure to signal on task finish. */ @GridToStringExclude - private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() { - @Override public void apply(IgniteInternalFuture<Object> t) { - signalTaskFinished(t); - } - }; + private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = + new IgniteInClosure<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> t) { + signalTaskFinished(t); + } + }; /** * @param node Node. @@ -1357,24 +1364,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed // Cache local node flag. isLocNode = node.equals(ctx.discovery().localNode()); - entries = newEntries(); - curFut = new GridFutureAdapter<>(); - curFut.listen(signalC); + Integer attrStreamerPoolSize = node.attribute(IgniteNodeAttributes.ATTR_DATA_STREAMER_POOL_SIZE); - sem = new Semaphore(parallelOps); - } + int streamerPoolSize = attrStreamerPoolSize != null ? attrStreamerPoolSize : node.metrics().getTotalCpus(); - /** - * @param remap Remapping flag. - */ - private void renewBatch(boolean remap) { - entries = newEntries(); - curFut = new GridFutureAdapter<>(); + perNodeParallelOps = parallelOps != 0 ? parallelOps : + streamerPoolSize * IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER; - batchTopVer = null; + sem = new Semaphore(perNodeParallelOps); - if (!remap) - curFut.listen(signalC); + stripes = (PerStripeBuffer[])Array.newInstance(PerStripeBuffer.class, streamerPoolSize); + + for (int i = 0; i < stripes.length; i++) + stripes[i] = new PerStripeBuffer(i, signalC); } /** @@ -1385,61 +1387,69 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @return Future for operation. * @throws IgniteInterruptedCheckedException If failed. */ - @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry> newEntries, + @Nullable List<GridFutureAdapter<?>> update( + Iterable<DataStreamerEntry> newEntries, AffinityTopologyVersion topVer, IgniteInClosure<IgniteInternalFuture<?>> lsnr, - boolean remap) throws IgniteInterruptedCheckedException { - List<DataStreamerEntry> entries0 = null; + boolean remap + ) throws IgniteInterruptedCheckedException { + List<GridFutureAdapter<?>> res = null; - GridFutureAdapter<Object> curFut0; + for (DataStreamerEntry entry : newEntries) { + List<DataStreamerEntry> entries0 = null; + AffinityTopologyVersion curBatchTopVer; + + // Init buffer. + int part = entry.getKey().partition(); - AffinityTopologyVersion curBatchTopVer; + GridFutureAdapter<Object> curFut0; + PerStripeBuffer b = stripes[part % stripes.length]; - synchronized (this) { - curFut0 = curFut; + synchronized (b) { + curFut0 = b.curFut; - curFut0.listen(lsnr); + // Listener should be added only once per whole entries collection. + // Should we simplify the model and get rid of all futures? + curFut0.listen(lsnr); - if (batchTopVer == null) - batchTopVer = topVer; + if (b.batchTopVer == null) + b.batchTopVer = topVer; - curBatchTopVer = batchTopVer; + curBatchTopVer = b.batchTopVer; - for (DataStreamerEntry entry : newEntries) - entries.add(entry); + b.entries.add(entry); - if (entries.size() >= bufSize) { - entries0 = entries; + if (b.entries.size() >= bufSize) { + entries0 = b.entries; - renewBatch(remap); + b.renewBatch(remap); + } } - } - if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) { - renewBatch(remap); + if (res == null) + res = new ArrayList<>(); - curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." + - "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]")); - } - else if (entries0 != null) { - submit(entries0, curBatchTopVer, curFut0, remap); - - if (cancelled) - curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + - DataStreamerImpl.this)); - else if (ctx.clientDisconnected()) - curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), - "Client node disconnected.")); - } + res.add(curFut0); - return curFut0; - } + if (!allowOverwrite() && !topVer.equals(curBatchTopVer)) { + b.renewBatch(remap); - /** - * @return Fresh collection with some space for outgrowth. - */ - private List<DataStreamerEntry> newEntries() { - return new ArrayList<>((int)(bufSize * 1.2)); + curFut0.onDone(null, new IgniteCheckedException("Topology changed during batch preparation." + + "[batchTopVer=" + curBatchTopVer + ", topVer=" + topVer + "]")); + } + else if (entries0 != null) { + submit(entries0, curBatchTopVer, curFut0, remap, b.partId); + + if (cancelled) + curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + + DataStreamerImpl.this)); + else if (ctx.clientDisconnected()) + curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Client node disconnected.")); + } + } + + return res; } /** @@ -1447,24 +1457,26 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @throws IgniteInterruptedCheckedException If thread has been interrupted. */ @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException { - List<DataStreamerEntry> entries0 = null; - GridFutureAdapter<Object> curFut0 = null; - acquireRemapSemaphore(); - synchronized (this) { - if (!entries.isEmpty()) { - entries0 = entries; - curFut0 = curFut; + for (PerStripeBuffer b : stripes) { + AffinityTopologyVersion batchTopVer = null; + List<DataStreamerEntry> entries0 = null; + GridFutureAdapter<Object> curFut0 = null; + + synchronized (b) { + if (!b.entries.isEmpty()) { + entries0 = b.entries; + curFut0 = b.curFut; + batchTopVer = b.batchTopVer; - entries = newEntries(); - curFut = new GridFutureAdapter<>(); - curFut.listen(signalC); + b.renewBatch(false); + } } - } - if (entries0 != null) - submit(entries0, batchTopVer, curFut0, false); + if (entries0 != null) + submit(entries0, batchTopVer, curFut0, false, b.partId); + } // Create compound future for this flush. GridCompoundFuture<Object, Object> res = null; @@ -1618,13 +1630,16 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed * @param topVer Topology version. * @param curFut Current future. * @param remap Remapping flag. + * @param partId Partition ID. * @throws IgniteInterruptedCheckedException If interrupted. */ - private void submit(final Collection<DataStreamerEntry> entries, + private void submit( + final Collection<DataStreamerEntry> entries, @Nullable AffinityTopologyVersion topVer, final GridFutureAdapter<Object> curFut, - boolean remap) - throws IgniteInterruptedCheckedException { + boolean remap, + int partId + ) throws IgniteInterruptedCheckedException { assert entries != null; assert !entries.isEmpty(); assert curFut != null; @@ -1685,7 +1700,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed cache.context().deploy().onEnter(); } catch (IgniteCheckedException e) { - U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e); + U.error(log, "Failed to deploy class (request will not be sent): " + + jobPda0.deployClass(), e); return; } @@ -1718,7 +1734,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed dep != null ? dep.participants() : null, dep != null ? dep.classLoaderId() : null, dep == null, - topVer); + topVer, + (rcvr == ISOLATED_UPDATER) ? + partId : GridIoMessage.STRIPE_DISABLED_PART); try { ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, plc); @@ -1767,11 +1785,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed // Make sure to complete current future. GridFutureAdapter<Object> curFut0; - synchronized (this) { - curFut0 = curFut; - } + for (PerStripeBuffer b : stripes) { + synchronized (b) { + curFut0 = b.curFut; + } - curFut0.onDone(e); + curFut0.onDone(e); + } } /** @@ -1845,10 +1865,14 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** {@inheritDoc} */ @Override public String toString() { - int size; + int size = 0; - synchronized (this) { - size = entries.size(); + for (int i = 0; i < stripes.length; i++) { + PerStripeBuffer b = stripes[i]; + + synchronized (b) { + size += b.entries.size(); + } } return S.toString(Buffer.class, this, @@ -1937,8 +1961,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed private static final long serialVersionUID = 0L; /** {@inheritDoc} */ - @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache, - Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) { + @Override public void receive( + IgniteCache<KeyCacheObject, CacheObject> cache, + Collection<Map.Entry<KeyCacheObject, CacheObject>> entries + ) { IgniteCacheProxy<KeyCacheObject, CacheObject> proxy = (IgniteCacheProxy<KeyCacheObject, CacheObject>)cache; GridCacheAdapter<KeyCacheObject, CacheObject> internalCache = proxy.context().cache(); @@ -2097,4 +2123,63 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed return S.toString(KeyCacheObjectWrapper.class, this); } } + + /** + * + */ + private class PerStripeBuffer { + /** */ + private final int partId; + + /** */ + private List<DataStreamerEntry> entries; + + /** */ + private GridFutureAdapter<Object> curFut; + + /** Batch topology. */ + private AffinityTopologyVersion batchTopVer; + + /** */ + private final IgniteInClosure<? super IgniteInternalFuture<Object>> signalC; + + /** + * @param partId Partition ID. + * @param c Signal closure. + */ + public PerStripeBuffer( + int partId, + IgniteInClosure<? super IgniteInternalFuture<Object>> c + ) { + this.partId = partId; + signalC = c; + + renewBatch(false); + } + + /** + * @param remap Remap. + */ + synchronized void renewBatch(boolean remap) { + entries = newEntries(); + curFut = new GridFutureAdapter<>(); + + batchTopVer = null; + + if (!remap) + curFut.listen(signalC); + } + + /** + * @return Fresh collection with some space for outgrowth. + */ + private List<DataStreamerEntry> newEntries() { + return new ArrayList<>((int)(bufSize * 1.2)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PerStripeBuffer.class, this, super.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index b4cbf66..f70ee9c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -90,6 +90,9 @@ public class DataStreamerRequest implements Message { /** Topology version. */ private AffinityTopologyVersion topVer; + /** */ + private int partId; + /** * {@code Externalizable} support. */ @@ -113,8 +116,10 @@ public class DataStreamerRequest implements Message { * @param clsLdrId Class loader ID. * @param forceLocDep Force local deployment. * @param topVer Topology version. + * @param partId Partition ID. */ - public DataStreamerRequest(long reqId, + public DataStreamerRequest( + long reqId, byte[] resTopicBytes, @Nullable String cacheName, byte[] updaterBytes, @@ -128,7 +133,9 @@ public class DataStreamerRequest implements Message { Map<UUID, IgniteUuid> ldrParticipants, IgniteUuid clsLdrId, boolean forceLocDep, - @NotNull AffinityTopologyVersion topVer) { + @NotNull AffinityTopologyVersion topVer, + int partId + ) { assert topVer != null; this.reqId = reqId; @@ -146,6 +153,7 @@ public class DataStreamerRequest implements Message { this.clsLdrId = clsLdrId; this.forceLocDep = forceLocDep; this.topVer = topVer; + this.partId = partId; } /** @@ -253,6 +261,13 @@ public class DataStreamerRequest implements Message { return topVer; } + /** + * @return Partition ID. + */ + public int partition() { + return partId; + } + /** {@inheritDoc} */ @Override public void onAckReceived() { // No-op. @@ -324,42 +339,48 @@ public class DataStreamerRequest implements Message { writer.incrementState(); case 8: - if (!writer.writeLong("reqId", reqId)) + if (!writer.writeInt("partId", partId)) return false; writer.incrementState(); case 9: - if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) + if (!writer.writeLong("reqId", reqId)) return false; writer.incrementState(); case 10: - if (!writer.writeString("sampleClsName", sampleClsName)) + if (!writer.writeByteArray("resTopicBytes", resTopicBytes)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("skipStore", skipStore)) + if (!writer.writeString("sampleClsName", sampleClsName)) return false; writer.incrementState(); case 12: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeBoolean("skipStore", skipStore)) return false; writer.incrementState(); case 13: - if (!writer.writeByteArray("updaterBytes", updaterBytes)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 14: + if (!writer.writeByteArray("updaterBytes", updaterBytes)) + return false; + + writer.incrementState(); + + case 15: if (!writer.writeString("userVer", userVer)) return false; @@ -447,7 +468,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 8: - reqId = reader.readLong("reqId"); + partId = reader.readInt("partId"); if (!reader.isLastRead()) return false; @@ -455,7 +476,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 9: - resTopicBytes = reader.readByteArray("resTopicBytes"); + reqId = reader.readLong("reqId"); if (!reader.isLastRead()) return false; @@ -463,7 +484,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 10: - sampleClsName = reader.readString("sampleClsName"); + resTopicBytes = reader.readByteArray("resTopicBytes"); if (!reader.isLastRead()) return false; @@ -471,7 +492,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 11: - skipStore = reader.readBoolean("skipStore"); + sampleClsName = reader.readString("sampleClsName"); if (!reader.isLastRead()) return false; @@ -479,7 +500,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 12: - topVer = reader.readMessage("topVer"); + skipStore = reader.readBoolean("skipStore"); if (!reader.isLastRead()) return false; @@ -487,7 +508,7 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 13: - updaterBytes = reader.readByteArray("updaterBytes"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -495,6 +516,14 @@ public class DataStreamerRequest implements Message { reader.incrementState(); case 14: + updaterBytes = reader.readByteArray("updaterBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: userVer = reader.readString("userVer"); if (!reader.isLastRead()) @@ -514,6 +543,6 @@ public class DataStreamerRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; + return 16; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 6d5dc71..630d34c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -20,10 +20,12 @@ package org.apache.ignite.internal.util; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.List; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -36,6 +38,8 @@ import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -62,6 +66,17 @@ public class StripedExecutor implements ExecutorService { * @param log Logger. */ public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log) { + this(cnt, igniteInstanceName, poolName, log, false); + } + + /** + * @param cnt Count. + * @param igniteInstanceName Node name. + * @param poolName Pool name. + * @param log Logger. + * @param stealTasks {@code True} to steal tasks. + */ + public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) { A.ensure(cnt > 0, "cnt > 0"); boolean success = false; @@ -76,14 +91,19 @@ public class StripedExecutor implements ExecutorService { try { for (int i = 0; i < cnt; i++) { - stripes[i] = new StripeConcurrentQueue( + stripes[i] = stealTasks ? new StripeConcurrentQueue( igniteInstanceName, poolName, i, - log); + log, stripes) : new StripeConcurrentQueue( + igniteInstanceName, + poolName, + i, + log); + } + for (int i = 0; i < cnt; i++) stripes[i].start(); - } success = true; } @@ -397,7 +417,7 @@ public class StripedExecutor implements ExecutorService { private final String poolName; /** */ - private final int idx; + protected final int idx; /** */ private final IgniteLogger log; @@ -536,8 +556,17 @@ public class StripedExecutor implements ExecutorService { * Stripe. */ private static class StripeConcurrentQueue extends Stripe { + /** */ + private static final int IGNITE_TASKS_STEALING_THRESHOLD = + IgniteSystemProperties.getInteger( + IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4); + /** Queue. */ - private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); + private final Queue<Runnable> queue; + + /** */ + @GridToStringExclude + private final Stripe[] others; /** */ private volatile boolean parked; @@ -548,16 +577,37 @@ public class StripedExecutor implements ExecutorService { * @param idx Stripe index. * @param log Logger. */ - public StripeConcurrentQueue( + StripeConcurrentQueue( String igniteInstanceName, String poolName, int idx, IgniteLogger log ) { - super(igniteInstanceName, + this(igniteInstanceName, poolName, idx, log, null); + } + + /** + * @param igniteInstanceName Ignite instance name. + * @param poolName Pool name. + * @param idx Stripe index. + * @param log Logger. + */ + StripeConcurrentQueue( + String igniteInstanceName, + String poolName, + int idx, + IgniteLogger log, + Stripe[] others + ) { + super( + igniteInstanceName, poolName, idx, log); + + this.others = others; + + this.queue = others == null ? new ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>(); } /** {@inheritDoc} */ @@ -580,6 +630,24 @@ public class StripedExecutor implements ExecutorService { if (r != null) return r; + if(others != null) { + int len = others.length; + int init = ThreadLocalRandom.current().nextInt(len); + int cur = init; + + while (true) { + if(cur != idx) { + Deque<Runnable> queue = (Deque<Runnable>) ((StripeConcurrentQueue) others[cur]).queue; + + if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null) + return r; + } + + if ((cur = (cur + 1) % len) == init) + break; + } + } + LockSupport.park(); if (Thread.interrupted()) @@ -597,6 +665,13 @@ public class StripedExecutor implements ExecutorService { if (parked) LockSupport.unpark(thread); + + if(others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) { + for (Stripe other : others) { + if(((StripeConcurrentQueue)other).parked) + LockSupport.unpark(other.thread); + } + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java index 83a0384..b8a91a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThread.java @@ -158,6 +158,16 @@ public class IgniteThread extends Thread { } /** + * @return IgniteThread or {@code null} if current thread is not an instance of IgniteThread. + */ + public static IgniteThread current(){ + Thread thread = Thread.currentThread(); + + return thread.getClass() == IgniteThread.class || thread instanceof IgniteThread ? + ((IgniteThread)thread) : null; + } + + /** * Creates new thread name. * * @param num Thread number. http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index ec5e6d0..ac89021 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@ -962,8 +962,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME); - IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME); - try { + try (IgniteDataStreamer<String, String> ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { ldr.receiver(new StreamReceiver<String, String>() { @Override public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException { @@ -972,6 +971,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { cache.put("key", threadName); } }); + ldr.addData("key", "value"); ldr.tryFlush(); @@ -982,9 +982,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } }, 3_000); } - finally { - ldr.close(true); - } assertNotNull(cache.get("key")); @@ -1011,9 +1008,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { final IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME); - IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME); - - try { + try (IgniteDataStreamer<String, String> ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) { ldr.receiver(new StringStringStreamReceiver()); ldr.addData("key", "value"); @@ -1026,9 +1021,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } }, 3_000); } - finally { - ldr.close(true); - } assertNotNull(cache.get("key")); http://git-wip-us.apache.org/repos/asf/ignite/blob/212603e1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java index e72a9b4..6d3466b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImplSelfTest.java @@ -346,7 +346,8 @@ public class DataStreamerImplSelfTest extends GridCommonAbstractTest { req.participants(), req.classLoaderId(), req.forceLocalDeployment(), - staleTop); + staleTop, + -1); msg = new GridIoMessage( GridTestUtils.<Byte>getFieldValue(ioMsg, "plc"),