IGNITE-4927 Write behind - add an option to skip write coalescing Signed-off-by: nikolay_tikhonov <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/22580e19 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/22580e19 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/22580e19 Branch: refs/heads/ignite-4929 Commit: 22580e19b7ae5d11b8c299e2b3d92f5c8b9f0e8c Parents: c4d8180 Author: Alexander Belyak <[email protected]> Authored: Tue Apr 18 14:56:50 2017 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Apr 18 15:57:45 2017 +0300 ---------------------------------------------------------------------- .../configuration/CacheConfiguration.java | 33 + .../cache/store/GridCacheWriteBehindStore.java | 614 ++++++++++++++++--- ...idCacheWriteBehindStoreAbstractSelfTest.java | 24 +- .../GridCacheWriteBehindStoreAbstractTest.java | 4 + ...heWriteBehindStoreMultithreadedSelfTest.java | 88 ++- .../GridCacheWriteBehindStoreSelfTest.java | 159 ++++- ...ClientWriteBehindStoreNonCoalescingTest.java | 175 ++++++ .../IgniteCacheWriteBehindTestSuite.java | 2 + 8 files changed, 978 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index b5afba4..2308a10 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -176,6 +176,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Default batch size for write-behind cache store. */ public static final int DFLT_WRITE_BEHIND_BATCH_SIZE = 512; + /** Default write coalescing for write-behind cache store. */ + public static final boolean DFLT_WRITE_BEHIND_COALESCING = true; + /** Default maximum number of query iterators that can be stored. */ public static final int DFLT_MAX_QUERY_ITERATOR_CNT = 1024; @@ -310,6 +313,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { /** Maximum batch size for write-behind cache store. */ private int writeBehindBatchSize = DFLT_WRITE_BEHIND_BATCH_SIZE; + /** Write coalescing flag for write-behind cache store */ + private boolean writeBehindCoalescing = DFLT_WRITE_BEHIND_COALESCING; + /** Maximum number of query iterators that can be stored. */ private int maxQryIterCnt = DFLT_MAX_QUERY_ITERATOR_CNT; @@ -454,6 +460,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { tmLookupClsName = cc.getTransactionManagerLookupClassName(); topValidator = cc.getTopologyValidator(); writeBehindBatchSize = cc.getWriteBehindBatchSize(); + writeBehindCoalescing = cc.getWriteBehindCoalescing(); writeBehindEnabled = cc.isWriteBehindEnabled(); writeBehindFlushFreq = cc.getWriteBehindFlushFrequency(); writeBehindFlushSize = cc.getWriteBehindFlushSize(); @@ -1287,6 +1294,32 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { } /** + * Write coalescing flag for write-behind cache store operations. Store operations (get or remove) + * with the same key are combined or coalesced to single, resulting operation + * to reduce pressure to underlying cache store. + * <p/> + * If not provided, default value is {@link #DFLT_WRITE_BEHIND_COALESCING}. + * + * @return Write coalescing flag. + */ + public boolean getWriteBehindCoalescing() { + return writeBehindCoalescing; + } + + /** + * Sets write coalescing flag for write-behind cache. + * + * @param writeBehindCoalescing Write coalescing flag. + * @see #getWriteBehindCoalescing() + * @return {@code this} for chaining. + */ + public CacheConfiguration<K, V> setWriteBehindCoalescing(boolean writeBehindCoalescing) { + this.writeBehindCoalescing = writeBehindCoalescing; + + return this; + } + + /** * Use {@link IgniteConfiguration#getRebalanceThreadPoolSize()} instead. * * @return Size of rebalancing thread pool. http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java index 91008ce..64238ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -17,17 +17,19 @@ package org.apache.ignite.internal.processors.cache.store; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.cache.integration.CacheWriterException; @@ -43,9 +45,11 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentLinkedDeque8; import org.jsr166.ConcurrentLinkedHashMap; import static javax.cache.Cache.Entry; @@ -65,6 +69,8 @@ import static javax.cache.Cache.Entry; * <p/> * Since write operations to the cache store are deferred, transaction support is lost; no * transaction objects are passed to the underlying store. + * <p/> + * {@link GridCacheWriteBehindStore} doesn't support concurrent modifications of the same key. */ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, LifecycleAware { /** Default write cache initial capacity. */ @@ -91,6 +97,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy /** Count of worker threads performing underlying store updates. */ private int flushThreadCnt = CacheConfiguration.DFLT_WRITE_FROM_BEHIND_FLUSH_THREAD_CNT; + /** Is flush threads count power of two flag. */ + private boolean flushThreadCntIsPowerOfTwo; + /** Cache flush frequency. All pending operations will be performed in not less then this value ms. */ private long cacheFlushFreq = CacheConfiguration.DFLT_WRITE_BEHIND_FLUSH_FREQUENCY; @@ -98,29 +107,26 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy private int batchSize = CacheConfiguration.DFLT_WRITE_BEHIND_BATCH_SIZE; /** Ignite instance name. */ - private String igniteInstanceName; + private final String igniteInstanceName; /** Cache name. */ - private String cacheName; + private final String cacheName; /** Underlying store. */ - private CacheStore<K, V> store; + private final CacheStore<K, V> store; /** Write cache. */ private ConcurrentLinkedHashMap<K, StatefulValue<K, V>> writeCache; /** Flusher threads. */ - private GridWorker[] flushThreads; + private Flusher[] flushThreads; + + /** Write coalescing. */ + private boolean writeCoalescing = CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING; /** Atomic flag indicating store shutdown. */ private AtomicBoolean stopping = new AtomicBoolean(true); - /** Flush lock. */ - private Lock flushLock = new ReentrantLock(); - - /** Condition to determine records available for flush. */ - private Condition canFlush = flushLock.newCondition(); - /** Variable for counting total cache overflows. */ private AtomicInteger cacheTotalOverflowCntr = new AtomicInteger(); @@ -131,10 +137,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy private AtomicInteger retryEntriesCnt = new AtomicInteger(); /** Log. */ - private IgniteLogger log; + private final IgniteLogger log; /** Store manager. */ - private CacheStoreManager storeMgr; + private final CacheStoreManager storeMgr; + + /** Flush lock. */ + private final Lock flushLock = new ReentrantLock(); + + /** Condition to determine records available for flush. */ + private Condition canFlush = flushLock.newCondition(); /** * Creates a write-behind cache store for the given store. @@ -193,7 +205,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy * <p/> * If this value is {@code 0}, then flush is performed only on time-elapsing basis. However, * when this value is {@code 0}, the cache critical size is set to - * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE} + * {@link CacheConfiguration#DFLT_WRITE_BEHIND_CRITICAL_SIZE}. * * @return Buffer size that triggers flush procedure. */ @@ -208,6 +220,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy */ public void setFlushThreadCount(int flushThreadCnt) { this.flushThreadCnt = flushThreadCnt; + this.flushThreadCntIsPowerOfTwo = (flushThreadCnt & (flushThreadCnt - 1)) == 0; } /** @@ -220,6 +233,24 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** + * Sets the write coalescing flag. + * + * @param writeCoalescing Write coalescing flag. + */ + public void setWriteCoalescing(boolean writeCoalescing) { + this.writeCoalescing = writeCoalescing; + } + + /** + * Gets the write coalescing flag. + * + * @return Write coalescing flag. + */ + public boolean getWriteCoalescing() { + return writeCoalescing; + } + + /** * Sets the cache flush frequency. All pending operations on the underlying store will be performed * within time interval not less then this value. * @@ -266,7 +297,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy * @return Total count of entries in cache store internal buffer. */ public int getWriteBehindBufferSize() { - return writeCache.sizex(); + if (writeCoalescing) + return writeCache.sizex(); + else { + int size = 0; + + for (Flusher f : flushThreads) + size += f.size(); + + return size; + } } /** @@ -292,14 +332,15 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy if (cacheCriticalSize == 0) cacheCriticalSize = CacheConfiguration.DFLT_WRITE_BEHIND_CRITICAL_SIZE; - flushThreads = new GridWorker[flushThreadCnt]; + flushThreads = new GridCacheWriteBehindStore.Flusher[flushThreadCnt]; - writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl); + if (writeCoalescing) + writeCache = new ConcurrentLinkedHashMap<>(initCap, 0.75f, concurLvl); for (int i = 0; i < flushThreads.length; i++) { flushThreads[i] = new Flusher(igniteInstanceName, "flusher-" + i, log); - new IgniteThread(flushThreads[i]).start(); + flushThreads[i].start(); } } } @@ -344,7 +385,10 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy if (log.isDebugEnabled()) log.debug("Stopping write-behind store for cache '" + cacheName + '\''); - wakeUp(); + for (Flusher f : flushThreads) { + if (!f.isEmpty()) + f.wakeUp(); + } boolean graceful = true; @@ -352,7 +396,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy graceful &= U.join(worker, log); if (!graceful) - log.warning("Shutdown was aborted"); + log.warning("Write behind store shutdown was aborted."); } } @@ -361,7 +405,10 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy * @throws IgniteCheckedException If failed. */ public void forceFlush() throws IgniteCheckedException { - wakeUp(); + for (Flusher f : flushThreads) { + if (!f.isEmpty()) + f.wakeUp(); + } } /** {@inheritDoc} */ @@ -376,10 +423,15 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy Map<K, V> loaded = new HashMap<>(); - Collection<K> remaining = new LinkedList<>(); + Collection<K> remaining = null; for (K key : keys) { - StatefulValue<K, V> val = writeCache.get(key); + StatefulValue<K, V> val; + + if (writeCoalescing) + val = writeCache.get(key); + else + val = flusher(key).flusherWriteMap.get(key); if (val != null) { val.readLock().lock(); @@ -394,12 +446,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy val.readLock().unlock(); } } - else + else { + if (remaining == null) + remaining = new ArrayList<>(); + remaining.add(key); + } } // For items that were not found in queue. - if (!remaining.isEmpty()) { + if (remaining != null && !remaining.isEmpty()) { Map<K, V> loaded0 = store.loadAll(remaining); if (loaded0 != null) @@ -414,7 +470,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy if (log.isDebugEnabled()) log.debug("Store load [key=" + key + ']'); - StatefulValue<K, V> val = writeCache.get(key); + StatefulValue<K, V> val; + + if (writeCoalescing) + val = writeCache.get(key); + else + val = flusher(key).flusherWriteMap.get(key); if (val != null) { val.readLock().lock(); @@ -493,7 +554,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy * * @param key Key for which update is performed. * @param val New value, may be null for remove operation. - * @param operation Updated value status + * @param operation Updated value status. * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed. */ private void updateCache(K key, @@ -502,8 +563,27 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy throws IgniteInterruptedCheckedException { StatefulValue<K, V> newVal = new StatefulValue<>(val, operation); + if (writeCoalescing) + putToWriteCache(key, newVal); + else + flusher(key).putToFlusherWriteCache(key, newVal); + } + + /** + * Performs flush-consistent writeCache update for the given key. + * + * @param key Key for which update is performed. + * @param newVal stateful value to put + * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed. + */ + private void putToWriteCache( + K key, + StatefulValue<K, V> newVal) + throws IgniteInterruptedCheckedException { StatefulValue<K, V> prev; + assert writeCoalescing : "Unexpected write coalescing."; + while ((prev = writeCache.putIfAbsent(key, newVal)) != null) { prev.writeLock().lock(); @@ -523,7 +603,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy assert prev.status() == ValueStatus.NEW || prev.status() == ValueStatus.RETRY; - prev.update(val, operation, ValueStatus.NEW); + prev.update(newVal.val, newVal.operation(), ValueStatus.NEW); break; } @@ -533,14 +613,33 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } // Now check the map size - if (writeCache.sizex() > cacheCriticalSize) + int cacheSize = getWriteBehindBufferSize(); + + if (cacheSize > cacheCriticalSize) // Perform single store update in the same thread. flushSingleValue(); - else if (cacheMaxSize > 0 && writeCache.sizex() > cacheMaxSize) + else if (cacheMaxSize > 0 && cacheSize > cacheMaxSize) wakeUp(); } /** + * Return flusher by by key. + * + * @param key Key for search. + * @return flusher. + */ + private Flusher flusher(K key) { + int h, idx; + + if (flushThreadCntIsPowerOfTwo) + idx = ((h = key.hashCode()) ^ (h >>> 16)) & (flushThreadCnt - 1); + else + idx = ((h = key.hashCode()) ^ (h >>> 16)) % flushThreadCnt; + + return flushThreads[idx]; + } + + /** * Flushes one upcoming value to the underlying store. Called from * {@link #updateCache(Object, Entry, StoreOperation)} method in case when current map size exceeds * critical size. @@ -549,7 +648,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy cacheOverflowCntr.incrementAndGet(); try { - Map<K, StatefulValue<K, V>> batch = null; + Map<K, StatefulValue<K, V>> batch; for (Map.Entry<K, StatefulValue<K, V>> e : writeCache.entrySet()) { StatefulValue<K, V> val = e.getValue(); @@ -577,7 +676,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } if (!batch.isEmpty()) { - applyBatch(batch, false); + applyBatch(batch, false, null); cacheTotalOverflowCntr.incrementAndGet(); @@ -595,9 +694,12 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy * * @param valMap Batch map. * @param initSes {@code True} if need to initialize session. + * @param flusher Flusher, assotiated with all keys in batch (have sense in write coalescing = false mode) + * @return {@code True} if batch was successfully applied, {@code False} otherwise. */ - private void applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes) { + private boolean applyBatch(Map<K, StatefulValue<K, V>> valMap, boolean initSes, Flusher flusher) { assert valMap.size() <= batchSize; + assert !valMap.isEmpty(); StoreOperation operation = null; @@ -615,7 +717,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy batch.put(e.getKey(), e.getValue().entry()); } - if (updateStore(operation, batch, initSes)) { + boolean result = updateStore(operation, batch, initSes, flusher); + + if (result) { for (Map.Entry<K, StatefulValue<K, V>> e : valMap.entrySet()) { StatefulValue<K, V> val = e.getValue(); @@ -624,12 +728,22 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy try { val.status(ValueStatus.FLUSHED); - StatefulValue<K, V> prev = writeCache.remove(e.getKey()); + if (writeCoalescing) { + StatefulValue<K, V> prev = writeCache.remove(e.getKey()); - // Additional check to ensure consistency. - assert prev == val : "Map value for key " + e.getKey() + " was updated during flush"; + // Additional check to ensure consistency. + assert prev == val : "Map value for key " + e.getKey() + " was updated during flush"; - val.signalFlushed(); + val.signalFlushed(); + } + else { + Flusher f = flusher(e.getKey()); + + // Can remove using equal because if map contains another similar value it has different state. + f.flusherWriteMap.remove(e.getKey(), e.getValue()); + + val.signalFlushed(); + } } finally { val.writeLock().unlock(); @@ -653,6 +767,8 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } } } + + return result; } /** @@ -666,13 +782,16 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy * @param operation Status indicating operation that should be performed. * @param vals Key-Value map. * @param initSes {@code True} if need to initialize session. + * @param flusher Flusher, assotiated with vals keys (in writeCoalescing=false mode) * @return {@code true} if value may be deleted from the write cache, * {@code false} otherwise */ - private boolean updateStore(StoreOperation operation, + private boolean updateStore( + StoreOperation operation, Map<K, Entry<? extends K, ? extends V>> vals, - boolean initSes) { - + boolean initSes, + Flusher flusher + ) { try { if (initSes && storeMgr != null) storeMgr.writeBehindSessionInit(); @@ -707,7 +826,14 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy catch (Exception e) { LT.error(log, e, "Unable to update underlying store: " + store); - if (writeCache.sizex() > cacheCriticalSize || stopping.get()) { + boolean overflow; + + if (writeCoalescing) + overflow = writeCache.sizex() > cacheCriticalSize || stopping.get(); + else + overflow = flusher.isOverflowed() || stopping.get(); + + if (overflow) { for (Map.Entry<K, Entry<? extends K, ? extends V>> entry : vals.entrySet()) { Object val = entry.getValue() != null ? entry.getValue().getValue() : null; @@ -738,29 +864,163 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** - * Thread that performs time-based flushing of written values to the underlying storage. + * Thread that performs time/size-based flushing of written values to the underlying storage. */ private class Flusher extends GridWorker { + /** Queue to flush. */ + private final ConcurrentLinkedDeque8<IgniteBiTuple<K, StatefulValue<K,V>>> queue; + + /** Flusher write map. */ + private final ConcurrentHashMap<K, StatefulValue<K,V>> flusherWriteMap; + + /** Critical size of flusher local queue. */ + private final int flusherCacheCriticalSize; + + /** Flusher parked flag. */ + private volatile boolean parked; + + /** Flusher thread. */ + protected Thread thread; + + /** Cache flushing frequence in nanos. */ + protected long cacheFlushFreqNanos = cacheFlushFreq * 1000; + + /** Writer lock. */ + private final Lock flusherWriterLock = new ReentrantLock(); + + /** Confition to determine available space for flush. */ + private Condition flusherWriterCanWrite = flusherWriterLock.newCondition(); + /** {@inheritDoc */ - protected Flusher(String igniteInstanceName, String name, IgniteLogger log) { + protected Flusher(String igniteInstanceName, + String name, + IgniteLogger log) { super(igniteInstanceName, name, log); + + flusherCacheCriticalSize = cacheCriticalSize/flushThreadCnt; + + assert flusherCacheCriticalSize > batchSize; + + if (writeCoalescing) { + queue = null; + flusherWriteMap = null; + } + else { + queue = new ConcurrentLinkedDeque8<>(); + flusherWriteMap = new ConcurrentHashMap<>(initCap, 0.75f, concurLvl); + } + } + + /** Start flusher thread */ + protected void start() { + thread = new IgniteThread(this); + thread.start(); + } + + /** + * Performs flush-consistent flusher writeCache update for the given key. + * + * @param key Key for which update is performed. + * @param newVal stateful value to put + * @throws IgniteInterruptedCheckedException If interrupted while waiting for value to be flushed. + */ + private void putToFlusherWriteCache( + K key, + StatefulValue<K, V> newVal) + throws IgniteInterruptedCheckedException { + assert !writeCoalescing : "Unexpected write coalescing."; + + if (queue.sizex() > flusherCacheCriticalSize) { + while (queue.sizex() > flusherCacheCriticalSize) { + wakeUp(); + + flusherWriterLock.lock(); + + try { + // Wait for free space in flusher queue + while (queue.sizex() >= flusherCacheCriticalSize && !stopping.get()) { + if (cacheFlushFreq > 0) + flusherWriterCanWrite.await(cacheFlushFreq, TimeUnit.MILLISECONDS); + else + flusherWriterCanWrite.await(); + } + + cacheTotalOverflowCntr.incrementAndGet(); + } + catch (InterruptedException e) { + if (log.isDebugEnabled()) + log.debug("Caught interrupted exception: " + e); + + Thread.currentThread().interrupt(); + } + finally { + flusherWriterLock.unlock(); + } + } + + cacheTotalOverflowCntr.incrementAndGet(); + } + + queue.add(F.t(key, newVal)); + + flusherWriteMap.put(key, newVal); + } + + /** + * Get overflowed flag. + * + * @return {@code True} if write behind flusher is overflowed, + * {@code False} otherwise. + */ + public boolean isOverflowed() { + if (writeCoalescing) + return writeCache.sizex() > cacheCriticalSize; + else + return queue.sizex() > flusherCacheCriticalSize; + } + + /** + * Get write behind flusher size. + * + * @return Flusher write behind size. + */ + public int size() { + return writeCoalescing ? writeCache.sizex() : queue.sizex(); + } + + /** + * Test if write behind flusher is empty + * + * @return {@code True} if write behind flusher is empty, {@code False} otherwise + */ + public boolean isEmpty() { + return writeCoalescing ? writeCache.isEmpty() : queue.isEmpty(); } /** {@inheritDoc} */ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { - while (!stopping.get() || writeCache.sizex() > 0) { - awaitOperationsAvailable(); + if (writeCoalescing) { + while (!stopping.get() || writeCache.sizex() > 0) { + awaitOperationsAvailableCoalescing(); - flushCache(writeCache.entrySet().iterator()); + flushCacheCoalescing(); + } + } + else { + while (!stopping.get() || queue.sizex() > 0) { + awaitOperationsAvailableNonCoalescing(); + + flushCacheNonCoalescing(); + } } } /** - * This method awaits until enough elements in map are available or given timeout is over. + * This method awaits until enough elements in flusher queue are available or given timeout is over. * * @throws InterruptedException If awaiting was interrupted. */ - private void awaitOperationsAvailable() throws InterruptedException { + private void awaitOperationsAvailableCoalescing() throws InterruptedException { flushLock.lock(); try { @@ -780,74 +1040,215 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** + * This method awaits until enough elements in flusher queue are available or given timeout is over. + * + * @throws InterruptedException If awaiting was interrupted. + */ + private void awaitOperationsAvailableNonCoalescing() throws InterruptedException { + if (queue.sizex() >= batchSize) + return; + + parked = true; + + try { + for (;;) { + if (queue.sizex() >= batchSize) + return; + + if (cacheFlushFreq > 0) + LockSupport.parkNanos(cacheFlushFreqNanos); + else + LockSupport.park(); + + if (queue.sizex() > 0) + return; + + if (Thread.interrupted()) + throw new InterruptedException(); + + if (stopping.get()) + return; + } + } + finally { + parked = false; + } + } + + /** + * Wake up flusher thread. + */ + public void wakeUp() { + if (parked) + LockSupport.unpark(thread); + } + + /** * Removes values from the write cache and performs corresponding operation * on the underlying store. - * - * @param it Iterator for write cache. */ - private void flushCache(Iterator<Map.Entry<K,StatefulValue<K, V>>> it) { - StoreOperation operation = null; + private void flushCacheCoalescing() { + StoreOperation prevOperation = null; - Map<K, StatefulValue<K, V>> batch = null; - Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize); + Map<K, StatefulValue<K, V>> pending = U.newLinkedHashMap(batchSize); + Iterator<Map.Entry<K, StatefulValue<K, V>>> it = writeCache.entrySet().iterator(); while (it.hasNext()) { Map.Entry<K, StatefulValue<K, V>> e = it.next(); - StatefulValue<K, V> val = e.getValue(); - val.writeLock().lock(); + if (!val.writeLock().tryLock()) // TODO: stripe write maps to avoid lock contention. + continue; try { - ValueStatus status = val.status(); + BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, e.getKey(), val); - if (acquired(status)) - // Another thread is helping us, continue to the next entry. - continue; - - if (status == ValueStatus.RETRY) - retryEntriesCnt.decrementAndGet(); + switch (addRes) { + case NEW_BATCH: + applyBatch(pending, true, null); - assert retryEntriesCnt.get() >= 0; + pending = U.newLinkedHashMap(batchSize); - val.status(ValueStatus.PENDING); + // No need to test first value in batch + val.status(ValueStatus.PENDING); + pending.put(e.getKey(), val); + prevOperation = val.operation(); - // We scan for the next operation and apply batch on operation change. Null means new batch. - if (operation == null) - operation = val.operation(); + break; - if (operation != val.operation()) { - // Operation is changed, so we need to perform a batch. - batch = pending; - pending = U.newLinkedHashMap(batchSize); + case ADDED: + prevOperation = val.operation(); - operation = val.operation(); + break; - pending.put(e.getKey(), val); + default: + assert addRes == BatchingResult.SKIPPED : "Unexpected result: " + addRes; } - else - pending.put(e.getKey(), val); + } + finally { + val.writeLock().unlock(); + } + } + + // Process the remainder. + if (!pending.isEmpty()) + applyBatch(pending, true, null); + } + + /** + * Removes values from the flusher write queue and performs corresponding operation + * on the underlying store. + */ + private void flushCacheNonCoalescing() { + StoreOperation prevOperation; + Map<K, StatefulValue<K, V>> pending; + IgniteBiTuple<K, StatefulValue<K, V>> tuple; + boolean applied; + + while(!queue.isEmpty()) { + pending = U.newLinkedHashMap(batchSize); + prevOperation = null; + boolean needNewBatch = false; + + // Collect batch + while (!needNewBatch && (tuple = queue.peek()) != null) { + BatchingResult addRes = tryAddStatefulValue(pending, prevOperation, tuple.getKey(), + tuple.getValue()); + + switch (addRes) { + case ADDED: + prevOperation = tuple.getValue().operation(); + queue.poll(); + + break; + + case SKIPPED: + assert false : "Unexpected result: " + addRes; + + break; - if (pending.size() == batchSize) { - batch = pending; - pending = U.newLinkedHashMap(batchSize); + case NEW_BATCH: + needNewBatch = true; + prevOperation = null; - operation = null; + break; + + default: + assert false : "Unexpected result: " + addRes; } } - finally { - val.writeLock().unlock(); + + // Process collected batch + applied = applyBatch(pending, true, this); + + if (applied) { + // Wake up awaiting writers + flusherWriterLock.lock(); + + try { + flusherWriterCanWrite.signalAll(); + } + finally { + flusherWriterLock.unlock(); + } } + else { + // Return values to queue + ArrayList<Map.Entry<K, StatefulValue<K,V>>> pendingList = new ArrayList(pending.entrySet()); - if (batch != null && !batch.isEmpty()) { - applyBatch(batch, true); - batch = null; + for (int i = pendingList.size() - 1; i >= 0; i--) + queue.addFirst(F.t(pendingList.get(i).getKey(), pendingList.get(i).getValue())); } } + } - // Process the remainder. - if (!pending.isEmpty()) - applyBatch(pending, true); + /** + * Trying to add key and statefull value pairs into pending map. + * + * @param pending Map to populate. + * @param key Key to add. + * @param val Stateful value to add. + * @return {@code BatchingResult.ADDED} if pair was sucessfully added, + * {@code BatchingResult.SKIPPED} if pair cannot be processed by this thread, + * {@code BatchingResult.NEW_BATCH} if pair require new batch (pending map) to be added. + */ + public BatchingResult tryAddStatefulValue( + Map<K, StatefulValue<K, V>> pending, + StoreOperation prevOperation, + K key, + StatefulValue<K, V> val + ) { + ValueStatus status = val.status(); + + assert !(pending.isEmpty() && prevOperation != null) : "prev operation cannot be " + prevOperation + + " if prev map is empty!"; + + if (acquired(status)) + // Another thread is helping us, continue to the next entry. + return BatchingResult.SKIPPED; + + if (!writeCoalescing && pending.containsKey(key)) + return BatchingResult.NEW_BATCH; + + if (status == ValueStatus.RETRY) + retryEntriesCnt.decrementAndGet(); + + assert retryEntriesCnt.get() >= 0; + + if (pending.size() == batchSize) + return BatchingResult.NEW_BATCH; + + // We scan for the next operation and apply batch on operation change. Null means new batch. + if (prevOperation != val.operation() && prevOperation != null) + // Operation is changed, so we need to perform a batch. + return BatchingResult.NEW_BATCH; + else { + val.status(ValueStatus.PENDING); + + pending.put(key, val); + + return BatchingResult.ADDED; + } } } @@ -861,6 +1262,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** + * For test purposes only. + * + * @return Flusher maps for the underlying store operations. + */ + Map<K, StatefulValue<K,V>>[] flusherMaps() { + Map<K, StatefulValue<K,V>>[] result = new Map[flushThreadCnt]; + + for (int i=0; i < flushThreadCnt; i++) + result[i] = flushThreads[i].flusherWriteMap; + + return result; + } + + /** * Enumeration that represents possible operations on the underlying store. */ private enum StoreOperation { @@ -889,6 +1304,20 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** + * Enumeration that represents possible result of "add to batch" operation. + */ + private enum BatchingResult { + /** Added to batch */ + ADDED, + + /** Skipped. */ + SKIPPED, + + /** Need new batch. */ + NEW_BATCH + } + + /** * Checks if given status indicates pending or complete flush operation. * * @param status Status to check. @@ -901,6 +1330,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy /** * A state-value-operation trio. * + * @param <K> Key type. * @param <V> Value type. */ private static class StatefulValue<K, V> extends ReentrantReadWriteLock { @@ -949,7 +1379,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** - * @return Value status + * @return Value status. */ private ValueStatus status() { return valStatus; @@ -980,7 +1410,7 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy } /** - * Awaits a signal on flush condition + * Awaits a signal on flush condition. * * @throws IgniteInterruptedCheckedException If thread was interrupted. */ @@ -1023,4 +1453,4 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy return S.toString(StatefulValue.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java index 323278f..3bac906 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractSelfTest.java @@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.cache.store; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.GridCacheTestStore; @@ -59,16 +61,29 @@ public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridComm /** * Initializes store. * - * @param flushThreadCnt Count of flush threads + * @param flushThreadCnt Count of flush threads. * @throws Exception If failed. */ protected void initStore(int flushThreadCnt) throws Exception { + initStore(flushThreadCnt, CacheConfiguration.DFLT_WRITE_BEHIND_COALESCING); + } + + /** + * Initializes store. + * + * @param flushThreadCnt Count of flush threads. + * @param writeCoalescing write coalescing flag. + * @throws Exception If failed. + */ + protected void initStore(int flushThreadCnt, boolean writeCoalescing) throws Exception { store = new GridCacheWriteBehindStore<>(null, "", "", log, delegate); store.setFlushFrequency(FLUSH_FREQUENCY); store.setFlushSize(CACHE_SIZE); + store.setWriteCoalescing(writeCoalescing); + store.setFlushThreadCount(flushThreadCnt); delegate.reset(); @@ -83,8 +98,11 @@ public abstract class GridCacheWriteBehindStoreAbstractSelfTest extends GridComm */ protected void shutdownStore() throws Exception { store.stop(); - - assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty()); + if (store.getWriteCoalescing()) + assertTrue("Store cache must be empty after shutdown", store.writeCache().isEmpty()); + else + for (Map<?,?> fMap : store.flusherMaps()) + assertTrue("Store flusher cache must be empty after shutdown", fMap.isEmpty()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java index ffdad5c..56ee760 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreAbstractTest.java @@ -37,6 +37,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.Parameterized.Parameter; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java index bc6b7bd..15c58d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreMultithreadedSelfTest.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -28,12 +29,30 @@ import org.apache.ignite.internal.util.typedef.internal.U; */ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { /** + * This test performs complex set of operations on store with coalescing from multiple threads. + * + * @throws Exception If failed. + */ + public void testPutGetRemoveWithCoalescing() throws Exception { + testPutGetRemove(true); + } + + /** + * This test performs complex set of operations on store without coalescing from multiple threads. + * + * @throws Exception If failed. + */ + public void testPutGetRemoveWithoutCoalescing() throws Exception { + testPutGetRemove(false); + } + + /** * This test performs complex set of operations on store from multiple threads. * * @throws Exception If failed. */ - public void testPutGetRemove() throws Exception { - initStore(2); + private void testPutGetRemove(boolean writeCoalescing) throws Exception { + initStore(2, writeCoalescing); Set<Integer> exp; @@ -63,26 +82,54 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri } /** + * Tests that cache with write coalescing would keep values if underlying store fails. + * + * @throws Exception if failed. + */ + public void testStoreFailureWithCoalescing() throws Exception { + testStoreFailure(true); + } + + /** + * Tests that cache without write coalescing would keep values if underlying store fails. + * + * @throws Exception if failed. + */ + public void testStoreFailureWithoutCoalescing() throws Exception { + testStoreFailure(false); + } + + /** * Tests that cache would keep values if underlying store fails. * * @throws Exception If failed. */ - public void testStoreFailure() throws Exception { + private void testStoreFailure(boolean writeCoalescing) throws Exception { delegate.setShouldFail(true); - initStore(2); + initStore(2, writeCoalescing); Set<Integer> exp; try { + Thread timer = new Thread(new Runnable() { + @Override + public void run() { + try { + U.sleep(FLUSH_FREQUENCY+50); + } catch (IgniteInterruptedCheckedException e) { + assertTrue("Timer was interrupted", false); + } + delegate.setShouldFail(false); + } + }); + timer.start(); exp = runPutGetRemoveMultithreaded(10, 10); - U.sleep(FLUSH_FREQUENCY); + timer.join(); info(">>> There are " + store.getWriteBehindErrorRetryCount() + " entries in RETRY state"); - delegate.setShouldFail(false); - // Despite that we set shouldFail flag to false, flush thread may just have caught an exception. // If we move store to the stopping state right away, this value will be lost. That's why this sleep // is inserted here to let all exception handlers in write-behind store exit. @@ -111,16 +158,37 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri } /** + * Tests store (with write coalescing) consistency in case of high put rate, + * when flush is performed from the same thread as put or remove operation. + * + * @throws Exception If failed. + */ + public void testFlushFromTheSameThreadWithCoalescing() throws Exception { + testFlushFromTheSameThread(true); + } + + /** + * Tests store (without write coalescing) consistency in case of high put rate, + * when flush is performed from the same thread as put or remove operation. + * + * @throws Exception If failed. + */ + public void testFlushFromTheSameThreadWithoutCoalescing() throws Exception { + testFlushFromTheSameThread(false); + } + + /** * Tests store consistency in case of high put rate, when flush is performed from the same thread * as put or remove operation. * + * @param writeCoalescing write coalescing flag. * @throws Exception If failed. */ - public void testFlushFromTheSameThread() throws Exception { + private void testFlushFromTheSameThread(boolean writeCoalescing) throws Exception { // 50 milliseconds should be enough. delegate.setOperationDelay(50); - initStore(2); + initStore(2, writeCoalescing); Set<Integer> exp; @@ -162,4 +230,4 @@ public class GridCacheWriteBehindStoreMultithreadedSelfTest extends GridCacheWri for (Integer key : exp) assertEquals("Invalid value for key " + key, "val" + key, map.get(key)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java index 67e26ab..9a487a4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStoreSelfTest.java @@ -35,11 +35,30 @@ import org.jsr166.ConcurrentLinkedHashMap; */ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStoreAbstractSelfTest { /** - * Tests correct store shutdown when underlying store fails, + * Tests correct store (with write coalescing) shutdown when underlying store fails. * * @throws Exception If failed. */ - public void testShutdownWithFailure() throws Exception { + public void testShutdownWithFailureWithCoalescing() throws Exception { + testShutdownWithFailure(true); + } + + /** + * Tests correct store (without write coalescing) shutdown when underlying store fails. + * + * @throws Exception If failed. + */ + public void testShutdownWithFailureWithoutCoalescing() throws Exception { + testShutdownWithFailure(false); + } + + /** + * Tests correct store shutdown when underlying store fails. + * + * @param writeCoalescing Write coalescing flag. + * @throws Exception If failed. + */ + private void testShutdownWithFailure(final boolean writeCoalescing) throws Exception { final AtomicReference<Exception> err = new AtomicReference<>(); multithreadedAsync(new Runnable() { @@ -47,7 +66,7 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore try { delegate.setShouldFail(true); - initStore(2); + initStore(2, writeCoalescing); try { store.write(new CacheEntryImpl<>(1, "val1")); @@ -70,10 +89,31 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore } /** + * Simple store (with write coalescing) test. + * + * @throws Exception If failed. + */ + public void testSimpleStoreWithCoalescing() throws Exception { + testSimpleStore(true); + } + + /** + * Simple store (without write coalescing) test. + * * @throws Exception If failed. */ - public void testSimpleStore() throws Exception { - initStore(2); + public void testSimpleStoreWithoutCoalescing() throws Exception { + testSimpleStore(false); + } + + /** + * Simple store test. + * + * @param writeCoalescing Write coalescing flag. + * @throws Exception If failed. + */ + private void testSimpleStore(boolean writeCoalescing) throws Exception { + initStore(2, writeCoalescing); try { store.write(new CacheEntryImpl<>(1, "v1")); @@ -95,14 +135,35 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore } /** + * Check that all values written to the store with coalescing will be in underlying store after timeout + * or due to size limits. + * + * @throws Exception If failed. + */ + public void testValuePropagationWithCoalescing() throws Exception { + testValuePropagation(true); + } + + /** + * Check that all values written to the store without coalescing will be in underlying store after timeout + * or due to size limits. + * + * @throws Exception If failed. + */ + public void testValuePropagationWithoutCoalescing() throws Exception { + testValuePropagation(false); + } + + /** * Check that all values written to the store will be in underlying store after timeout or due to size limits. * + * @param writeCoalescing Write coalescing flag * @throws Exception If failed. */ @SuppressWarnings({"NullableProblems"}) - public void testValuePropagation() throws Exception { + private void testValuePropagation(boolean writeCoalescing) throws Exception { // Need to test size-based write. - initStore(1); + initStore(1, writeCoalescing); try { for (int i = 0; i < CACHE_SIZE * 2; i++) @@ -132,12 +193,31 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore } /** + * Tests store with write coalescing behaviour under continuous put of the same key with different values. + * + * @throws Exception If failed. + */ + public void testContinuousPutWithCoalescing() throws Exception { + testContinuousPut(true); + } + + /** + * Tests store without write coalescing behaviour under continuous put of the same key with different values. + * + * @throws Exception If failed. + */ + public void testContinuousPutWithoutCoalescing() throws Exception { + testContinuousPut(false); + } + + /** * Tests store behaviour under continuous put of the same key with different values. * - * @throws Exception If failed + * @param writeCoalescing Write coalescing flag for cache. + * @throws Exception If failed. */ - public void testContinuousPut() throws Exception { - initStore(2); + private void testContinuousPut(boolean writeCoalescing) throws Exception { + initStore(2, writeCoalescing); try { final AtomicBoolean running = new AtomicBoolean(true); @@ -169,17 +249,22 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore }, 1, "put"); U.sleep(FLUSH_FREQUENCY * 2 + 500); + running.set(false); + U.sleep(FLUSH_FREQUENCY * 2 + 500); int delegatePutCnt = delegate.getPutAllCount(); - running.set(false); fut.get(); log().info(">>> [putCnt = " + actualPutCnt.get() + ", delegatePutCnt=" + delegatePutCnt + "]"); assertTrue("No puts were made to the underlying store", delegatePutCnt > 0); - assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10); + if (store.getWriteCoalescing()) { + assertTrue("Too many puts were made to the underlying store", delegatePutCnt < actualPutCnt.get() / 10); + } else { + assertTrue("Too few puts cnt=" + actualPutCnt.get() + " << storePutCnt=" + delegatePutCnt, delegatePutCnt > actualPutCnt.get() / 2); + } } finally { shutdownStore(); @@ -193,13 +278,34 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore } /** + * Tests that all values were put into the store with write coalescing will be written to the underlying store + * after shutdown is called. + * + * @throws Exception If failed. + */ + public void testShutdownWithCoalescing() throws Exception { + testShutdown(true); + } + + /** + * Tests that all values were put into the store without write coalescing will be written to the underlying store + * after shutdown is called. + * + * @throws Exception If failed. + */ + public void testShutdownWithoutCoalescing() throws Exception { + testShutdown(false); + } + + /** * Tests that all values were put into the store will be written to the underlying store * after shutdown is called. * + * @param writeCoalescing Write coalescing flag. * @throws Exception If failed. */ - public void testShutdown() throws Exception { - initStore(2); + private void testShutdown(boolean writeCoalescing) throws Exception { + initStore(2, writeCoalescing); try { final AtomicBoolean running = new AtomicBoolean(true); @@ -243,14 +349,35 @@ public class GridCacheWriteBehindStoreSelfTest extends GridCacheWriteBehindStore /** * Tests that all values will be written to the underlying store + * right in the same order as they were put into the store with coalescing. + * + * @throws Exception If failed. + */ + public void testBatchApplyWithCoalescing() throws Exception { + testBatchApply(true); + } + + /** + * Tests that all values will be written to the underlying store + * right in the same order as they were put into the store without coalescing. + * + * @throws Exception If failed. + */ + public void testBatchApplyWithoutCoalescing() throws Exception { + testBatchApply(false); + } + + /** + * Tests that all values will be written to the underlying store * right in the same order as they were put into the store. * + * @param writeCoalescing Write coalescing flag. * @throws Exception If failed. */ - public void testBatchApply() throws Exception { + private void testBatchApply(boolean writeCoalescing) throws Exception { delegate = new GridCacheTestStore(new ConcurrentLinkedHashMap<Integer, String>()); - initStore(1); + initStore(1, writeCoalescing); List<Integer> intList = new ArrayList<>(CACHE_SIZE); http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java new file mode 100644 index 0000000..8ea109d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/store/IgnteCacheClientWriteBehindStoreNonCoalescingTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.store; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteFuture; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; + +/** + * This class provides non write coalescing tests for {@link org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore}. + */ +public class IgnteCacheClientWriteBehindStoreNonCoalescingTest extends IgniteCacheAbstractTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { + return CLOCK; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected Factory<CacheStore> cacheStoreFactory() { + return new TestIncrementStoreFactory(); + } + + /** + * @throws Exception If failed. + */ + public void testNonCoalescingIncrementing() throws Exception { + Ignite ignite = grid(0); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + assertEquals(cache.getConfiguration(CacheConfiguration.class).getCacheStoreFactory().getClass(), + TestIncrementStoreFactory.class); + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < 1000; i++) { + keys.add(i); + + cache.put(i, i); + } + + Collection<IgniteFuture<?>> futs = new ArrayList<>(); + + for (int i = 0; i < 100; i++) + futs.add(updateKeys(cache, keys)); + + for (IgniteFuture<?> fut : futs) + fut.get(); + } + + /** + * Update specified keys in async mode. + * + * @param cache Cache to use. + * @param keys Keys to update. + * @return IgniteFuture. + */ + private IgniteFuture<?> updateKeys(IgniteCache<Integer, Integer> cache, Set<Integer> keys) { + IgniteCache asyncCache = cache.withAsync(); + + // Using EntryProcessor.invokeAll to increment every value in place. + asyncCache.invokeAll(keys, new EntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(entry.getValue() + 1); + + return null; + } + }); + + return asyncCache.future(); + } + + /** + * Test increment store factory. + */ + public static class TestIncrementStoreFactory implements Factory<CacheStore> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new TestIncrementStore(); + } + } + + /** + * Test cache store to validate int value incrementing + */ + public static class TestIncrementStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) { + for (Map.Entry<Object, Object> e : storeMap.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) { + Object oldValue = storeMap.put(entry.getKey(), entry.getValue()); + + if (oldValue instanceof Integer && entry.getValue() instanceof Integer) { + Integer oldInt = (Integer)oldValue; + Integer newInt = (Integer)entry.getValue(); + + assertTrue( + "newValue(" + newInt + ") != oldValue(" + oldInt + ")+1 !", + newInt == oldInt + 1); + } + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/22580e19/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java index b4cdfa8..dff93ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheWriteBehindTestSuite.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindSto import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStoreReplicatedTest; import org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStoreSelfTest; import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreAtomicTest; +import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreNonCoalescingTest; import org.apache.ignite.internal.processors.cache.store.IgnteCacheClientWriteBehindStoreTxTest; /** @@ -49,6 +50,7 @@ public class IgniteCacheWriteBehindTestSuite extends TestSuite { suite.addTest(new TestSuite(GridCachePartitionedWritesTest.class)); suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreAtomicTest.class)); suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreTxTest.class)); + suite.addTest(new TestSuite(IgnteCacheClientWriteBehindStoreNonCoalescingTest.class)); return suite; }
