https://issues.apache.org/jira/browse/IGNITE-4393
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07573183 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07573183 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07573183 Branch: refs/heads/ignite-4371 Commit: 075731835368a0c4a4e36e796105553c38ce41af Parents: d8ce5af Author: Yakov Zhdanov <[email protected]> Authored: Thu Dec 8 12:01:18 2016 +0700 Committer: Yakov Zhdanov <[email protected]> Committed: Thu Dec 8 12:01:18 2016 +0700 ---------------------------------------------------------------------- .../java/org/apache/ignite/BenchAtomic.java | 24 ++++++++++---------- .../internal/GridPerformanceSuggestions.java | 2 +- .../processors/cache/GridCacheAdapter.java | 24 +++++++++++++++++--- .../processors/cache/IgniteCacheProxy.java | 8 +++++++ .../dht/atomic/GridDhtAtomicCache.java | 5 ++-- .../local/atomic/GridLocalAtomicCache.java | 3 +++ 6 files changed, 48 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java index 4f99123..fdaf56c 100644 --- a/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java +++ b/modules/core/src/main/java/org/apache/ignite/BenchAtomic.java @@ -132,7 +132,7 @@ public class BenchAtomic { final IgniteCache<Integer, byte[]> cache0 = ignite.getOrCreateCache( BenchAtomic.<Integer, byte[]>cacheConfig(writeSync)); - final IgniteCache<Integer, byte[]> asyncCache = cache0.withAsync(); +// final IgniteCache<Integer, byte[]> asyncCache = cache0.withAsync(); final Semaphore sem = new Semaphore(2048); @@ -176,17 +176,17 @@ public class BenchAtomic { int key = ThreadLocalRandom.current().nextInt(KEYS); - if (async) { - sem.acquireUninterruptibly(); - - asyncCache.put(key, val); - - IgniteFuture<Object> f = asyncCache.future(); - - f.listen(lsnr); - - continue; - } +// if (async) { +// sem.acquireUninterruptibly(); +// +// asyncCache.put(key, val); +// +// IgniteFuture<Object> f = asyncCache.future(); +// +// f.listen(lsnr); +// +// continue; +// } boolean startTx = cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL; http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java index b040a97..5e8e520 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java @@ -89,4 +89,4 @@ public class GridPerformanceSuggestions { @Override public String toString() { return S.toString(GridPerformanceSuggestions.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index eb0a8d9..a8d9f1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -288,6 +288,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Asynchronous operations limit semaphore. */ private Semaphore asyncOpsSem; + /** */ + protected volatile boolean asyncToggled; + /** {@inheritDoc} */ @Override public String name() { return cacheCfg.getName(); @@ -364,6 +367,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** + * Toggles async flag if someone calls {@code withAsync()} + * on proxy and since that we have to properly handle all cache + * operations (sync and async) to put them in proper sequence. + * + * TODO: https://issues.apache.org/jira/browse/IGNITE-4393 + */ + void toggleAsync() { + if (!asyncToggled) + asyncToggled = true; + } + + /** * Prints memory stats. */ public void printMemoryStats() { @@ -2534,6 +2549,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Put future. */ public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) { + A.notNull(key, "key", val, "val"); + final boolean statsEnabled = ctx.config().isStatisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -2554,8 +2571,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V */ public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val, @Nullable final CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - if (keyCheck) validateCacheKey(key); @@ -4592,6 +4607,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Failed future if waiting was interrupted. */ @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() { + if (!asyncToggled) + return null; + try { if (asyncOpsSem != null) asyncOpsSem.acquire(); @@ -4610,7 +4628,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Releases asynchronous operations permit, if limited. */ protected void asyncOpRelease() { - if (asyncOpsSem != null) + if (asyncOpsSem != null && asyncToggled) asyncOpsSem.release(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f87fa1d..b9e6e82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -334,6 +334,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @Override public IgniteCache<K, V> withAsync() { + if (delegate instanceof GridCacheAdapter) + ((GridCacheAdapter)delegate).toggleAsync(); + + return super.withAsync(); + } + + /** {@inheritDoc} */ @Override public IgniteCache<K, V> withSkipStore() { return skipStore(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 940c74e..0e60ff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -613,8 +613,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - return updateAsync0( key, val, @@ -814,6 +812,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { */ @SuppressWarnings("unchecked") protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { + if (!asyncToggled) + return op.apply(); + IgniteInternalFuture<T> fail = asyncOpAcquire(); if (fail != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/07573183/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index a419887..bc16ff4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1585,6 +1585,9 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { */ @SuppressWarnings("unchecked") protected IgniteInternalFuture asyncOp(final Callable<?> op) { + if (!asyncToggled) + return ctx.closures().callLocalSafe(op); + IgniteInternalFuture fail = asyncOpAcquire(); if (fail != null)
