Fixed "IGNITE-5390 But in IgniteCacheTxStoreSessionWriteBehindCoalescingTest"
Signed-off-by: nikolay_tikhonov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d8a50e47 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d8a50e47 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d8a50e47 Branch: refs/heads/ignite-2.1 Commit: d8a50e47a51718c9ef202375b324695b78225813 Parents: 1cf402f Author: Alexander Belyak <[email protected]> Authored: Thu Jul 6 14:28:22 2017 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Thu Jul 6 14:32:27 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/ClusterCachesInfo.java | 4 + .../processors/cache/GridCacheAttributes.java | 7 ++ .../store/GridCacheStoreManagerAdapter.java | 1 + .../cache/IgniteCacheAbstractTest.java | 17 ++++ ...acheStoreSessionWriteBehindAbstractTest.java | 62 +++++++++----- ...TxStoreSessionWriteBehindCoalescingTest.java | 88 ++++++++++++++++++++ ...ClientWriteBehindStoreNonCoalescingTest.java | 30 ++++--- .../testsuites/IgniteCacheTestSuite4.java | 2 + 8 files changed, 180 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 5452bd2..5aca8c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -263,6 +263,10 @@ class ClusterCachesInfo { "Write behind batch size", locAttr.writeBehindBatchSize(), rmtAttr.writeBehindBatchSize(), false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindCoalescing", + "Write behind coalescing", locAttr.writeBehindCoalescing(), rmtAttr.writeBehindCoalescing(), + false); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "writeBehindEnabled", "Write behind enabled", locAttr.writeBehindEnabled(), rmtAttr.writeBehindEnabled(), false); http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java index dca4286..32871ea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java @@ -266,6 +266,13 @@ public class GridCacheAttributes implements Serializable { } /** + * @return Write coalescing flag. + */ + public boolean writeBehindCoalescing() { + return ccfg.getWriteBehindCoalescing(); + } + + /** * @return Interceptor class name. */ public String interceptorClassName() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 8ff2f5a..c02e2c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -185,6 +185,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt store.setFlushThreadCount(cfg.getWriteBehindFlushThreadCount()); store.setFlushFrequency(cfg.getWriteBehindFlushFrequency()); store.setBatchSize(cfg.getWriteBehindBatchSize()); + store.setWriteCoalescing(cfg.getWriteBehindCoalescing()); return store; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java index c5cb715..34a811b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractTest.java @@ -144,6 +144,9 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { cfg.setReadThrough(true); cfg.setWriteThrough(true); cfg.setLoadPreviousValue(true); + + cfg.setWriteBehindEnabled(writeBehindEnabled()); + cfg.setWriteBehindCoalescing(writeBehindCoalescing()); } if (cacheMode() == PARTITIONED) @@ -162,6 +165,20 @@ public abstract class IgniteCacheAbstractTest extends GridCommonAbstractTest { } /** + * @return write behind enabled flag. + */ + protected boolean writeBehindEnabled() { + return false; + } + + /** + * @return write behind coalescing flag. + */ + protected boolean writeBehindCoalescing() { + return true; + } + + /** * @return Cache loader factory. */ protected Factory<? extends CacheLoader> loaderFactory() { http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java index dcbb63f..7ad240d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java @@ -49,6 +49,9 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign private static volatile CountDownLatch latch; /** */ + protected static volatile CountDownLatch entLatch; + + /** */ private static volatile ExpectedData expData; /** {@inheritDoc} */ @@ -66,36 +69,42 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign return null; } - /** {@inheritDoc} */ + /** + * @param igniteInstanceName Ignite instance name. + * @return Cache configuration. + * @throws Exception In case of error. + */ @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { + CacheConfiguration ccfg0 = super.cacheConfiguration(igniteInstanceName); - assert cfg.getCacheConfiguration().length == 1; - - CacheConfiguration ccfg0 = cfg.getCacheConfiguration()[0]; ccfg0.setReadThrough(true); ccfg0.setWriteThrough(true); ccfg0.setWriteBehindBatchSize(10); ccfg0.setWriteBehindFlushSize(10); - ccfg0.setWriteBehindFlushFrequency(60_000); + ccfg0.setWriteBehindFlushFrequency(600); ccfg0.setWriteBehindEnabled(true); ccfg0.setCacheStoreFactory(singletonFactory(new TestStore())); - CacheConfiguration ccfg1 = cacheConfiguration(igniteInstanceName); + return ccfg0; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - ccfg1.setReadThrough(true); - ccfg1.setWriteThrough(true); - ccfg1.setWriteBehindBatchSize(10); - ccfg1.setWriteBehindFlushSize(10); - ccfg1.setWriteBehindFlushFrequency(60_000); - ccfg1.setWriteBehindEnabled(true); + assert cfg.getCacheConfiguration().length == 1; - ccfg1.setName(CACHE_NAME1); + CacheConfiguration ccfg0 = cacheConfiguration(igniteInstanceName); + + ccfg0.setName(DEFAULT_CACHE_NAME); + + CacheConfiguration ccfg1 = cacheConfiguration(igniteInstanceName); - ccfg1.setCacheStoreFactory(singletonFactory(new TestStore())); + ccfg1.setName(CACHE_NAME1); cfg.setCacheConfiguration(ccfg0, ccfg1); @@ -120,6 +129,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign try { latch = new CountDownLatch(2); + entLatch = new CountDownLatch(11); expData = new ExpectedData("writeAll", cacheName); @@ -127,13 +137,17 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign cache.put(i, i); assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS)); + + assertTrue(entLatch.await(10_000,TimeUnit.MILLISECONDS)); } finally { latch = null; + entLatch = null; } try { latch = new CountDownLatch(2); + entLatch = new CountDownLatch(11); expData = new ExpectedData("deleteAll", cacheName); @@ -141,16 +155,20 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign cache.remove(i); assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS)); + + assertTrue(entLatch.await(10_000,TimeUnit.MILLISECONDS)); } finally { latch = null; + entLatch = null; } } /** * */ - private class TestStore implements CacheStore<Object, Object> { + protected class TestStore implements CacheStore<Object, Object> { + /** Auto-injected store session. */ @CacheStoreSessionResource private CacheStoreSession ses; @@ -191,10 +209,13 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign /** {@inheritDoc} */ @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException { log.info("writeAll: " + entries); - + assertTrue("Unexpected entries: " + entries, entries.size() == 10 || entries.size() == 1); checkSession("writeAll"); + + for (int i = 0; i < entries.size(); i++) + entLatch.countDown(); } /** {@inheritDoc} */ @@ -209,6 +230,9 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign assertTrue("Unexpected keys: " + keys, keys.size() == 10 || keys.size() == 1); checkSession("deleteAll"); + + for (int i = 0; i < keys.size(); i++) + entLatch.countDown(); } /** @@ -221,7 +245,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign /** * @param mtd Called stored method. */ - private void checkSession(String mtd) { + protected void checkSession(String mtd) { assertNotNull(ignite); CacheStoreSession ses = session(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java new file mode 100644 index 0000000..58cc380 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionWriteBehindCoalescingTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.integration; + +import java.util.Collection; +import javax.cache.Cache; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * Integration test write behind cache store with {@link CacheConfiguration#getWriteBehindCoalescing()}={@code False} + * parameter. + */ +public class IgniteCacheTxStoreSessionWriteBehindCoalescingTest extends IgniteCacheStoreSessionWriteBehindAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * @param igniteInstanceName Ignite instance name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + @SuppressWarnings("unchecked") + protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName); + + ccfg.setWriteBehindCoalescing(false); + + ccfg.setCacheStoreFactory(singletonFactory(new TestNonCoalescingStore())); + + return ccfg; + } + + /** + * + */ + private class TestNonCoalescingStore extends TestStore { + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException { + log.info("writeAll: " + entries); + + assertTrue("Unexpected entries: " + entries, entries.size() <= 10); + + checkSession("writeAll"); + + for (int i = 0; i < entries.size(); i++) + entLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + fail(); + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + log.info("deleteAll: " + keys); + + assertTrue("Unexpected keys: " + keys, keys.size() <= 10); + + checkSession("deleteAll"); + + for (int i = 0; i < keys.size(); i++) + entLatch.countDown(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java index 6a75dbd..4ffa973 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Map; +import java.util.Random; import java.util.Set; import javax.cache.Cache; import javax.cache.configuration.Factory; @@ -36,6 +37,7 @@ import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteFuture; @@ -70,6 +72,14 @@ public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCac return new TestIncrementStoreFactory(); } + /** {@inheritDoc} */ + @Override protected boolean writeBehindEnabled() { return true;} + + /** {@inheritDoc} */ + @Override protected boolean writeBehindCoalescing() { return false;} + + private static Random rnd = new Random(); + /** * @throws Exception If failed. */ @@ -81,35 +91,30 @@ public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCac assertEquals(cache.getConfiguration(CacheConfiguration.class).getCacheStoreFactory().getClass(), TestIncrementStoreFactory.class); - Set<Integer> keys = new HashSet<>(); - - for (int i = 0; i < 1000; i++) { - keys.add(i); - + for (int i = 0; i < CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_SIZE * 2; i++) { cache.put(i, i); } Collection<IgniteFuture<?>> futs = new ArrayList<>(); - for (int i = 0; i < 100; i++) - futs.add(updateKeys(cache, keys)); + for (int i = 0; i < 1000; i++) + futs.add(updateKey(cache)); for (IgniteFuture<?> fut : futs) fut.get(); } /** - * Update specified keys in async mode. + * Update random key in async mode. * * @param cache Cache to use. - * @param keys Keys to update. * @return IgniteFuture. */ - private IgniteFuture<?> updateKeys(IgniteCache<Integer, Integer> cache, Set<Integer> keys) { + private IgniteFuture<?> updateKey(IgniteCache<Integer, Integer> cache) { IgniteCache asyncCache = cache.withAsync(); // Using EntryProcessor.invokeAll to increment every value in place. - asyncCache.invokeAll(keys, new EntryProcessor<Integer, Integer, Object>() { + asyncCache.invoke(rnd.nextInt(100), new EntryProcessor<Integer, Integer, Object>() { @Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException { entry.setValue(entry.getValue() + 1); @@ -150,7 +155,8 @@ public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCac @Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) { Object oldVal = storeMap.put(entry.getKey(), entry.getValue()); - if (oldVal instanceof Integer && entry.getValue() instanceof Integer) { + + if (oldVal != null) { Integer oldInt = (Integer) oldVal; Integer newInt = (Integer)entry.getValue(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d8a50e47/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 1b35acb..45f575e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -134,6 +134,7 @@ import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxNoLo import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxNoReadThroughTest; import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxNoWriteThroughTest; import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxStoreSessionTest; +import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxStoreSessionWriteBehindCoalescingTest; import org.apache.ignite.internal.processors.cache.integration.IgniteCacheTxStoreSessionWriteBehindTest; import org.apache.ignite.internal.processors.cache.version.CacheVersionedEntryLocalAtomicSwapDisabledSelfTest; import org.apache.ignite.internal.processors.cache.version.CacheVersionedEntryLocalTransactionalSelfTest; @@ -172,6 +173,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheTxStoreSessionTest.class); suite.addTestSuite(IgniteCacheAtomicStoreSessionWriteBehindTest.class); suite.addTestSuite(IgniteCacheTxStoreSessionWriteBehindTest.class); + suite.addTestSuite(IgniteCacheTxStoreSessionWriteBehindCoalescingTest.class); suite.addTestSuite(IgniteCacheAtomicNoReadThroughTest.class); suite.addTestSuite(IgniteCacheAtomicNearEnabledNoReadThroughTest.class);
