ignite-6669 Do not call CacheStoreSessionListener if store operation is not executed
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8672d7d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8672d7d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8672d7d Branch: refs/heads/ignite-zk Commit: b8672d7d691981be3c10f74e97ae2caa5ddd1593 Parents: c939bdb Author: Slava Koptilin <[email protected]> Authored: Thu Nov 9 18:10:31 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Nov 9 18:10:31 2017 +0300 ---------------------------------------------------------------------- .../store/GridCacheStoreManagerAdapter.java | 94 +++++- ...oreListenerRWThroughDisabledAtomicCache.java | 33 ++ ...enerRWThroughDisabledTransactionalCache.java | 138 +++++++++ ...SessionListenerReadWriteThroughDisabled.java | 291 ++++++++++++++++++ ...eStoreSessionListenerWriteBehindEnabled.java | 304 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 8 +- .../Cache/Store/CacheStoreSessionTest.cs | 13 +- 7 files changed, 856 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java index 9fe1f0c..22c2381 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java @@ -106,6 +106,9 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private boolean writeThrough; /** */ + private boolean readThrough; + + /** */ private Collection<CacheStoreSessionListener> sesLsnrs; /** */ @@ -122,6 +125,8 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt writeThrough = cfg.isWriteThrough(); + readThrough = cfg.isReadThrough(); + this.cfgStore = cfgStore; store = cacheStoreWrapper(ctx, cfgStore, cfg); @@ -306,7 +311,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt log.debug(S.toString("Loading value from store for key", "key", storeKey, true)); - sessionInit0(tx); + sessionInit0(tx, StoreOperation.READ, false); boolean threwEx = true; @@ -442,7 +447,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt if (log.isDebugEnabled()) log.debug("Loading values from store for keys: " + keys0); - sessionInit0(tx); + sessionInit0(tx, StoreOperation.READ, false); boolean threwEx = true; @@ -501,7 +506,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt if (log.isDebugEnabled()) log.debug("Loading all values from store."); - sessionInit0(null); + sessionInit0(null, StoreOperation.READ, false); boolean threwEx = true; @@ -567,7 +572,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt "val", val0, true)); } - sessionInit0(tx); + sessionInit0(tx, StoreOperation.WRITE, false); boolean threwEx = true; @@ -622,7 +627,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt if (log.isDebugEnabled()) log.debug("Storing values in cache store [entries=" + entries + ']'); - sessionInit0(tx); + sessionInit0(tx, StoreOperation.WRITE, false); boolean threwEx = true; @@ -675,7 +680,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt if (log.isDebugEnabled()) log.debug(S.toString("Removing value from cache store", "key", key0, true)); - sessionInit0(tx); + sessionInit0(tx, StoreOperation.WRITE, false); boolean threwEx = true; @@ -727,7 +732,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt log.debug(S.toString("Removing values from cache store", "keys", keys0, true)); - sessionInit0(tx); + sessionInit0(tx, StoreOperation.WRITE, false); boolean threwEx = true; @@ -778,10 +783,10 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt boolean storeSessionEnded) throws IgniteCheckedException { assert store != null; - sessionInit0(tx); + sessionInit0(tx, commit? StoreOperation.COMMIT: StoreOperation.ROLLBACK, false); try { - if (sesLsnrs != null) { + if (sesLsnrs != null && sesHolder.get().contains(store)) { for (CacheStoreSessionListener lsnr : sesLsnrs) lsnr.onSessionEnd(locSes, commit); } @@ -820,7 +825,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** {@inheritDoc} */ @Override public void writeBehindSessionInit() throws IgniteCheckedException { - sessionInit0(null); + sessionInit0(null, null, true); } /** {@inheritDoc} */ @@ -830,9 +835,12 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt /** * @param tx Current transaction. + * @param op Store operation. + * @param writeBehindStoreInitiator {@code true} if method call is initiated by {@link GridCacheWriteBehindStore}. * @throws IgniteCheckedException If failed. */ - private void sessionInit0(@Nullable IgniteInternalTx tx) throws IgniteCheckedException { + private void sessionInit0(@Nullable IgniteInternalTx tx, @Nullable StoreOperation op, + boolean writeBehindStoreInitiator) throws IgniteCheckedException { assert sesHolder != null; SessionData ses; @@ -854,8 +862,45 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt sesHolder.set(ses); + notifyCacheStoreSessionListeners(ses, op, writeBehindStoreInitiator); + } + + /** + * @param ses Current session. + * @param op Store operation. + * @param writeBehindStoreInitiator {@code True} if method call is initiated by {@link GridCacheWriteBehindStore}. + * @throws IgniteCheckedException If failed. + */ + private void notifyCacheStoreSessionListeners(SessionData ses, @Nullable StoreOperation op, + boolean writeBehindStoreInitiator) throws IgniteCheckedException { try { - if (!ses.started(store) && sesLsnrs != null) { + boolean notifyLsnrs = false; + + if (writeBehindStoreInitiator) + notifyLsnrs = !ses.started(store) && sesLsnrs != null; + else { + assert op != null; + + switch (op) { + case READ: + notifyLsnrs = readThrough && !ses.started(store) && sesLsnrs != null; + break; + + case WRITE: + notifyLsnrs = !cacheConfiguration().isWriteBehindEnabled() && writeThrough + && !ses.started(store) && sesLsnrs != null; + break; + + case COMMIT: + case ROLLBACK: + // No needs to start the session (if not started yet) and notify listeners. + break; + + default: + assert false : "Unexpected operation: " + op.toString(); + } + } + if (notifyLsnrs) { for (CacheStoreSessionListener lsnr : sesLsnrs) lsnr.onSessionStart(locSes); } @@ -871,7 +916,7 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt private void sessionEnd0(@Nullable IgniteInternalTx tx, boolean threwEx) throws IgniteCheckedException { try { if (tx == null) { - if (sesLsnrs != null) { + if (sesLsnrs != null && sesHolder.get().contains(store)) { for (CacheStoreSessionListener lsnr : sesLsnrs) lsnr.onSessionEnd(locSes, !threwEx); } @@ -995,6 +1040,14 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt return !started.remove(store); } + /** + * @param store Cache store. + * @return {@code True} if session started. + */ + private boolean contains(CacheStore store) { + return started.contains(store); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SessionData.class, this, "tx", CU.txString(tx != null ? tx.tx : null)); @@ -1429,4 +1482,19 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt throw new UnsupportedOperationException(); } } + + /** Enumeration that represents possible operations on the underlying store. */ + private enum StoreOperation { + /** Read key-value pair from the underlying store. */ + READ, + + /** Update or remove key from the underlying store. */ + WRITE, + + /** Commit changes to the underlying store. */ + COMMIT, + + /** Rollback changes to the underlying store. */ + ROLLBACK + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java new file mode 100644 index 0000000..9b59940 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledAtomicCache.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import org.apache.ignite.cache.CacheAtomicityMode; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; + +/** + * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed. + */ +public class CacheStoreListenerRWThroughDisabledAtomicCache extends CacheStoreSessionListenerReadWriteThroughDisabled { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java new file mode 100644 index 0000000..6502c97 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreListenerRWThroughDisabledTransactionalCache.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import java.util.Random; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed. + */ +public class CacheStoreListenerRWThroughDisabledTransactionalCache extends CacheStoreSessionListenerReadWriteThroughDisabled { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * Tests {@link IgniteCache#get(Object)} with disabled read-through and write-through modes. + */ + public void testTransactionalLookup() { + testTransactionalLookup(OPTIMISTIC, READ_COMMITTED); + testTransactionalLookup(OPTIMISTIC, REPEATABLE_READ); + testTransactionalLookup(OPTIMISTIC, SERIALIZABLE); + + testTransactionalLookup(PESSIMISTIC, READ_COMMITTED); + testTransactionalLookup(PESSIMISTIC, REPEATABLE_READ); + testTransactionalLookup(PESSIMISTIC, SERIALIZABLE); + } + + /** + * @param concurrency Transaction concurrency level. + * @param isolation Transaction isolation level. + */ + private void testTransactionalLookup(TransactionConcurrency concurrency, TransactionIsolation isolation) { + IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + for (int i = 0; i < CNT; ++i) + cache.get(r.nextInt()); + + tx.commit(); + } + } + + /** + * Tests {@link IgniteCache#put(Object, Object)} with disabled read-through and write-through modes. + */ + public void testTransactionalUpdate() { + testTransactionalUpdate(OPTIMISTIC, READ_COMMITTED); + testTransactionalUpdate(OPTIMISTIC, REPEATABLE_READ); + testTransactionalUpdate(OPTIMISTIC, SERIALIZABLE); + + testTransactionalUpdate(PESSIMISTIC, READ_COMMITTED); + testTransactionalUpdate(PESSIMISTIC, REPEATABLE_READ); + testTransactionalUpdate(PESSIMISTIC, SERIALIZABLE); + } + + /** + * @param concurrency Transaction concurrency level. + * @param isolation Transaction isolation level. + */ + private void testTransactionalUpdate(TransactionConcurrency concurrency, TransactionIsolation isolation) { + IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + for (int i = 0; i < CNT; ++i) + cache.put(r.nextInt(), "test-value"); + + tx.commit(); + } + } + + /** + * Tests {@link IgniteCache#remove(Object)} with disabled read-through and write-through modes. + */ + public void testTransactionalRemove() { + testTransactionalRemove(OPTIMISTIC, READ_COMMITTED); + testTransactionalRemove(OPTIMISTIC, REPEATABLE_READ); + testTransactionalRemove(OPTIMISTIC, SERIALIZABLE); + + testTransactionalRemove(PESSIMISTIC, READ_COMMITTED); + testTransactionalRemove(PESSIMISTIC, REPEATABLE_READ); + testTransactionalRemove(PESSIMISTIC, SERIALIZABLE); + } + + /** + * @param concurrency Transaction concurrency level. + * @param isolation Transaction isolation level. + */ + private void testTransactionalRemove(TransactionConcurrency concurrency, TransactionIsolation isolation) { + IgniteCache cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + try (Transaction tx = grid(0).transactions().txStart(concurrency, isolation)) { + for (int i = 0; i < CNT; ++i) { + int key = r.nextInt(); + + cache.put(key, "test-value"); + + cache.remove(key, "test-value"); + } + + tx.commit(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java new file mode 100644 index 0000000..1f6e97d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerReadWriteThroughDisabled.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import java.io.PrintWriter; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.HashSet; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.logging.Logger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import javax.sql.DataSource; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; + +/** + * This class tests that redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are not executed. + */ +public abstract class CacheStoreSessionListenerReadWriteThroughDisabled extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + protected int gridCount() { + return 2; + } + + /** */ + protected final int CNT = 100; + + /** {@inheritDoc} */ + protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { + CacheConfiguration cacheCfg = super.cacheConfiguration(igniteInstanceName); + + cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(EmptyCacheStore.class)); + + cacheCfg.setCacheStoreSessionListenerFactories(new CacheStoreSessionFactory()); + + cacheCfg.setReadThrough(false); + cacheCfg.setWriteThrough(false); + + cacheCfg.setBackups(0); + + return cacheCfg; + } + + /** {@inheritDoc} */ + protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** + * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and + * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#get(Object)} performed. + * + * @throws Exception If failed. + */ + public void testLookup() throws Exception { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + for (int i = 0; i < CNT; ++i) + cache.get(r.nextInt()); + } + + /** + * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and + * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#getAll(Set)} performed. + * + * @throws Exception If failed. + */ + public void testBatchLookup() throws Exception { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + Set<Object> values = new HashSet<>(); + + for (int i = 0; i < CNT; ++i) + values.add(r.nextInt()); + + cache.getAll(values); + } + + /** + * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and + * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#put(Object, Object)} performed. + * + * @throws Exception If failed. + */ + public void testUpdate() throws Exception { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + for (int i = 0; i < CNT; ++i) + cache.put(r.nextInt(), "test-value"); + } + + /** + * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and + * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#putAll(Map)} performed. + * + * @throws Exception If failed. + */ + public void testBatchUpdate() throws Exception { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + Map<Object, Object> values = new TreeMap<>(); + + for (int i = 0; i < CNT; ++i) + values.put(r.nextInt(), "test-value"); + + cache.putAll(values); + } + + /** + * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and + * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#remove(Object)} performed. + * + * @throws Exception If failed. + */ + public void testRemove() throws Exception { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + for (int i = 0; i < CNT; ++i) { + int key = r.nextInt(); + + cache.put(key, "test-value"); + + cache.remove(key); + } + } + + /** + * Tests that there are no calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} and + * {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#removeAll(Set)} performed. + * + * @throws Exception If failed. + */ + public void testBatchRemove() throws Exception { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + Random r = new Random(); + + Set<Object> values = new HashSet<>(); + + for (int i = 0; i < CNT; ++i) { + int key = r.nextInt(); + + cache.put(key, "test-value"); + + values.add(key); + } + + cache.removeAll(values); + } + + /** + * Cache store session factory. + */ + public static class CacheStoreSessionFactory implements Factory<TestCacheStoreSessionListener> { + /** {@inheritDoc} */ + @Override public TestCacheStoreSessionListener create() { + TestCacheStoreSessionListener lsnr = new TestCacheStoreSessionListener(); + lsnr.setDataSource(new DataSourceStub()); + return lsnr; + } + } + + /** + * Test cache store session listener. + */ + public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener { + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + fail("TestCacheStoreSessionListener.onSessionStart(CacheStoreSession) should not be called."); + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + fail("TestCacheStoreSessionListener.onSessionEnd(CacheStoreSession, boolean) should not be called."); + } + } + + /** Empty cache store implementation. All overridden methods should not be called while the test is running. */ + public static class EmptyCacheStore extends CacheStoreAdapter { + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + fail("EmptyCacheStore.load(Object) should not be called."); + + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + fail("EmptyCacheStore.write(Cache.Entry) should not be called."); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + fail("EmptyCacheStore.delete(Object) should not be called."); + } + } + + /** + * Data source stub which should not be called. + */ + public static class DataSourceStub implements DataSource, Serializable { + /** {@inheritDoc} */ + @Override public Connection getConnection() throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public Connection getConnection(String username, String password) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> iface) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public PrintWriter getLogWriter() throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setLogWriter(PrintWriter out) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setLoginTimeout(int seconds) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int getLoginTimeout() throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java new file mode 100644 index 0000000..fbb881e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreSessionListenerWriteBehindEnabled.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store; + +import java.io.PrintWriter; +import java.io.Serializable; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import javax.sql.DataSource; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest; +import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * This class tests that calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * and {@link CacheStoreSessionListener#onSessionEnd(CacheStoreSession, boolean)} are executed from + * {@link GridCacheWriteBehindStore} only. + */ +public class CacheStoreSessionListenerWriteBehindEnabled extends GridCacheAbstractSelfTest { + /** */ + protected final static int CNT = 100; + + /** */ + private final static int WRITE_BEHIND_FLUSH_FREQUENCY = 1000; + + /** */ + private static final List<OperationType> operations = Collections.synchronizedList(new ArrayList<OperationType>()); + + /** */ + private static final AtomicInteger entryCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { + CacheConfiguration cacheCfg = super.cacheConfiguration(igniteInstanceName); + + cacheCfg.setCacheStoreFactory(FactoryBuilder.factoryOf(EmptyCacheStore.class)); + + cacheCfg.setCacheStoreSessionListenerFactories(new CacheStoreSessionFactory()); + + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + + cacheCfg.setWriteBehindEnabled(true); + cacheCfg.setWriteBehindBatchSize(CNT * 2); + cacheCfg.setWriteBehindFlushFrequency(WRITE_BEHIND_FLUSH_FREQUENCY); + + cacheCfg.setBackups(0); + + return cacheCfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + operations.clear(); + + entryCnt.set(0); + } + + /** + * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#get(Object)} performed. + */ + public void testLookup() { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < CNT; ++i) + cache.get(i); + + checkSessionCounters(CNT); + } + + /** + * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#put(Object, Object)} performed. + */ + public void testUpdate() { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < CNT; ++i) + cache.put(i, i); + + checkSessionCounters(1); + } + + /** + * Tests that there are no redundant calls of {@link CacheStoreSessionListener#onSessionStart(CacheStoreSession)} + * while {@link IgniteCache#remove(Object)} performed. + */ + public void testRemove() { + IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < CNT; ++i) { + cache.remove(i); + } + + checkSessionCounters(1); + } + + /** + * @param startedSessions Number of expected sessions. + */ + private void checkSessionCounters(int startedSessions) { + try { + // Wait for GridCacheWriteBehindStore + Thread.sleep(WRITE_BEHIND_FLUSH_FREQUENCY * 4); + + assertEquals(CNT, entryCnt.get()); + + checkOpCount(operations, OperationType.SESSION_START, startedSessions); + + checkOpCount(operations, OperationType.SESSION_END, startedSessions); + } + catch (InterruptedException e) { + throw new IgniteException("Failed to wait for the GridCacheWriteBehindStore due to interruption.", e); + } + } + + /** + * @param operations List of {@link OperationType}. + * @param op Operation. + * @param expected Expected number of operations for the given {@code op}. + */ + private void checkOpCount(List<OperationType> operations, OperationType op, int expected) { + int n = 0; + + for (OperationType o : operations) { + if (op.equals(o)) + ++n; + } + + assertEquals("Operation=" + op.name(), expected, n); + } + + /** + * Operation type. + */ + public enum OperationType { + /** + * Cache store session started. + */ + SESSION_START, + + /** + * Cache store session ended. + */ + SESSION_END, + } + + /** + * Cache store session factory. + */ + public static class CacheStoreSessionFactory implements Factory<TestCacheStoreSessionListener> { + /** {@inheritDoc} */ + @Override public TestCacheStoreSessionListener create() { + TestCacheStoreSessionListener lsnr = new TestCacheStoreSessionListener(); + lsnr.setDataSource(new DataSourceStub()); + return lsnr; + } + } + + /** + * Test cache store session listener. + */ + public static class TestCacheStoreSessionListener extends CacheJdbcStoreSessionListener { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void onSessionStart(CacheStoreSession ses) { + operations.add(OperationType.SESSION_START); + } + + /** {@inheritDoc} */ + @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) { + operations.add(OperationType.SESSION_END); + } + } + + /** + * Test cache store. + * + * {@link EmptyCacheStore#writeAll(Collection)} and {@link EmptyCacheStore#deleteAll(Collection)} should be called + * by {@link GridCacheWriteBehindStore}. + */ + public static class EmptyCacheStore extends CacheStoreAdapter<Object, Object> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + entryCnt.getAndIncrement(); + return null; + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { + entryCnt.addAndGet(entries.size()); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) { + entryCnt.addAndGet(keys.size()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + } + } + + /** + * Data source stub which should not be called. + */ + public static class DataSourceStub implements DataSource, Serializable { + /** {@inheritDoc} */ + @Override public Connection getConnection() throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public Connection getConnection(String username, String password) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> iface) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isWrapperFor(Class<?> iface) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public PrintWriter getLogWriter() throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setLogWriter(PrintWriter out) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void setLoginTimeout(int seconds) throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int getLoginTimeout() throws SQLException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index d931ea9..e4930e0 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -18,6 +18,9 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledAtomicCache; +import org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledTransactionalCache; +import org.apache.ignite.cache.store.CacheStoreSessionListenerWriteBehindEnabled; import org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest; import org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest; import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest; @@ -276,6 +279,9 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheOffheapMapEntrySelfTest.class); suite.addTestSuite(CacheJdbcStoreSessionListenerSelfTest.class); + suite.addTestSuite(CacheStoreListenerRWThroughDisabledAtomicCache.class); + suite.addTestSuite(CacheStoreListenerRWThroughDisabledTransactionalCache.class); + suite.addTestSuite(CacheStoreSessionListenerWriteBehindEnabled.class); suite.addTestSuite(CacheClientStoreSelfTest.class); suite.addTestSuite(CacheStoreUsageMultinodeStaticStartAtomicTest.class); @@ -341,4 +347,4 @@ public class IgniteCacheTestSuite4 extends TestSuite { return suite; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b8672d7d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs index 818948c..6c9def3 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Store/CacheStoreSessionTest.cs @@ -106,17 +106,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Store tx.Rollback(); } - // SessionEnd is called once per store instance. - Assert.AreEqual(StoreCount, _dumps.Count); - - foreach (var ops in _dumps) - { - var op = ops.Single(); - Assert.AreEqual(OperationType.SesEnd, op.Type); - Assert.IsFalse(op.Commit); - } - - _dumps = new ConcurrentBag<ICollection<Operation>>(); + // SessionEnd should not be called. + Assert.AreEqual(0, _dumps.Count); // 2. Test puts. using (var tx = ignite.GetTransactions().TxStart())
