Fixed GridEmbeddedFuture used for async cache operations, added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e567f8cb Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e567f8cb Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e567f8cb Branch: refs/heads/ignite-1093 Commit: e567f8cb3dd88c3c0a253c4fde06f8ced9b97fde Parents: c8bc1f9 Author: sboikov <[email protected]> Authored: Thu Aug 27 12:39:52 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Aug 27 12:41:05 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../util/future/GridEmbeddedFuture.java | 55 +++- .../distributed/CacheAsyncOperationsTest.java | 280 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite3.java | 2 + 5 files changed, 342 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 7adea2b..54d33e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4132,9 +4132,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final IgniteTxLocalAdapter tx0 = tx; if (fut != null && !fut.isDone()) { - IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut, - new C2<T, Exception, IgniteInternalFuture<T>>() { - @Override public IgniteInternalFuture<T> apply(T t, Exception e) { + IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut, + new IgniteOutClosure<IgniteInternalFuture>() { + @Override public IgniteInternalFuture<T> apply() { return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() { @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index c44b028..9087d20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -567,9 +567,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteInternalFuture fut = holder.future(); if (fut != null && !fut.isDone()) { - IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut, - new C2<T, Exception, IgniteInternalFuture<T>>() { - @Override public IgniteInternalFuture<T> apply(T t, Exception e) { + IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut, + new IgniteOutClosure<IgniteInternalFuture>() { + @Override public IgniteInternalFuture<T> apply() { return op.apply(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java index 4475fae..11b28b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java @@ -68,7 +68,8 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { /** * Embeds futures. Specific change order of arguments to avoid conflicts. - * @param embedded Closure. + * + * @param embedded Embedded future. * @param c Closure which runs upon completion of embedded closure and which returns another future. */ public GridEmbeddedFuture( @@ -200,6 +201,58 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> { }); } + /** + * @param embedded Embedded future. + * @param c Closure to create next future. + */ + public GridEmbeddedFuture( + IgniteInternalFuture<B> embedded, + final IgniteOutClosure<IgniteInternalFuture<A>> c + ) { + assert embedded != null; + assert c != null; + + this.embedded = embedded; + + embedded.listen(new AL1() { + @Override public void applyx(IgniteInternalFuture<B> embedded) { + try { + IgniteInternalFuture<A> next = c.apply(); + + if (next == null) { + onDone(); + + return; + } + + next.listen(new AL2() { + @Override public void applyx(IgniteInternalFuture<A> next) { + try { + onDone(next.get()); + } + catch (GridClosureException e) { + onDone(e.unwrap()); + } + catch (IgniteCheckedException | RuntimeException e) { + onDone(e); + } + catch (Error e) { + onDone(e); + + throw e; + } + } + }); + } + catch (Error e) { + onDone(e); + + throw e; + } + } + }); + } + /** {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException { return embedded.cancel(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java new file mode 100644 index 0000000..2094d0a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * + */ +public class CacheAsyncOperationsTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static volatile CountDownLatch latch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + if (gridName.equals(getTestGridName(1))) + cfg.setClientMode(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAsyncOperationsTx() throws Exception { + asyncOperations(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testAsyncOperationsAtomic() throws Exception { + asyncOperations(ATOMIC); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + CountDownLatch latch0 = latch; + + if (latch0 != null) + latch0.countDown(); + + latch = null; + } + + /** + * @param atomicityMode Atomicity mode. + * @throws Exception If failed. + */ + public void asyncOperations(CacheAtomicityMode atomicityMode) throws Exception { + try (IgniteCache<Integer, Integer> cache = ignite(1).getOrCreateCache(cacheConfiguration(atomicityMode))) { + async1(cache); + + async2(cache); + } + } + + /** + * @param cache Cache. + */ + private void async1(IgniteCache<Integer, Integer> cache) { + cache.put(1, 1); + + latch = new CountDownLatch(1); + + IgniteCache<Integer, Integer> asyncCache = cache.withAsync(); + + asyncCache.put(0, 0); + + IgniteFuture<?> fut1 = asyncCache.future(); + + asyncCache.getAndPut(1, 2); + + IgniteFuture<?> fut2 = asyncCache.future(); + + asyncCache.getAndPut(1, 3); + + IgniteFuture<?> fut3 = asyncCache.future(); + + assertFalse(fut1.isDone()); + assertFalse(fut2.isDone()); + assertFalse(fut3.isDone()); + + latch.countDown(); + + try { + fut1.get(); + + fail(); + } + catch (CacheException e) { + log.info("Expected error: " + e); + } + + assertEquals(1, fut2.get()); + assertEquals(2, fut3.get()); + + assertNull(cache.get(0)); + assertEquals((Integer)3, cache.get(1)); + } + /** + * + * @param cache Cache. + */ + private void async2(IgniteCache<Integer, Integer> cache) { + cache.put(1, 1); + + latch = new CountDownLatch(1); + + IgniteCache<Integer, Integer> asyncCache = cache.withAsync(); + + asyncCache.put(0, 0); + + IgniteFuture<?> fut1 = asyncCache.future(); + + asyncCache.put(0, 0); + + IgniteFuture<?> fut2 = asyncCache.future(); + + asyncCache.getAndPut(1, 2); + + IgniteFuture<?> fut3 = asyncCache.future(); + + asyncCache.put(0, 0); + + IgniteFuture<?> fut4 = asyncCache.future(); + + assertFalse(fut1.isDone()); + assertFalse(fut2.isDone()); + assertFalse(fut3.isDone()); + assertFalse(fut4.isDone()); + + latch.countDown(); + + try { + fut1.get(); + + fail(); + } + catch (CacheException e) { + log.info("Expected error: " + e); + } + + try { + fut2.get(); + + fail(); + } + catch (CacheException e) { + log.info("Expected error: " + e); + } + + assertEquals(1, fut3.get()); + + try { + fut4.get(); + + fail(); + } + catch (CacheException e) { + log.info("Expected error: " + e); + } + + assertNull(cache.get(0)); + assertEquals((Integer)2, cache.get(1)); + } + + /** + * @param atomicityMode Atomicity mode. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setWriteThrough(true); + ccfg.setCacheStoreFactory(new StoreFactory()); + + return ccfg; + } + + /** + * + */ + private static class StoreFactory implements Factory<TestStore> { + /** {@inheritDoc} */ + @Override public TestStore create() { + return new TestStore(); + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Integer, Integer> { + /** {@inheritDoc} */ + @Override public Integer load(Integer key) throws CacheLoaderException { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { + CountDownLatch latch0 = latch; + + if (latch0 != null) + U.awaitQuiet(latch0); + + Integer key = entry.getKey(); + + if (key.equals(0)) { + System.out.println(Thread.currentThread().getName() + ": fail operation for key: " + key); + + throw new CacheWriterException("Test error."); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index 5947d33..c20e901 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -133,6 +133,8 @@ public class IgniteCacheTestSuite3 extends TestSuite { suite.addTestSuite(IgniteTxGetAfterStopTest.class); + suite.addTestSuite(CacheAsyncOperationsTest.class); + return suite; } }
