This is an automated email from the ASF dual-hosted git repository. lgallinat pushed a commit to branch feature/GEODE-4435 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 43b395f8764df28abe19ff6d1182beee5806f69f Author: Lynn Gallinat <lgalli...@pivotal.io> AuthorDate: Thu Feb 1 10:02:43 2018 -0800 GEODE-4435 Move queueSize stat increment to after writing to disk. --- .../apache/geode/internal/cache/DiskRegion.java | 2 +- .../apache/geode/internal/cache/DiskStoreImpl.java | 206 ++++++++++++--------- .../cache/DiskStoreImplIntegrationTest.java | 50 +++++ .../geode/internal/cache/FlusherThreadTest.java | 76 ++++++++ .../geode/internal/cache/OplogFlushTest.java | 10 +- .../geode/internal/cache/OplogJUnitTest.java | 4 +- .../internal/cache/SimpleDiskRegionJUnitTest.java | 2 +- 7 files changed, 256 insertions(+), 94 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java index 161c583..18d6072 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java @@ -535,7 +535,7 @@ public class DiskRegion extends AbstractDiskRegion { * returns the active child */ Oplog testHook_getChild() { - return getDiskStore().persistentOplogs.getChild(); + return getDiskStore().getPersistentOplogs().getChild(); } /** For Testing * */ diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index f0959b8..724ed54 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -320,7 +320,7 @@ public class DiskStoreImpl implements DiskStore { private final AtomicReference<DiskAccessException> diskException = new AtomicReference<DiskAccessException>(); - PersistentOplogSet persistentOplogs = new PersistentOplogSet(this); + private PersistentOplogSet persistentOplogs = new PersistentOplogSet(this); OverflowOplogSet overflowOplogs = new OverflowOplogSet(this); @@ -667,19 +667,19 @@ public class DiskStoreImpl implements DiskStore { private OplogSet getOplogSet(DiskRegionView drv) { if (drv.isBackup()) { - return persistentOplogs; + return getPersistentOplogs(); } else { return overflowOplogs; } } public PersistentOplogSet getPersistentOplogSet() { - return persistentOplogs; + return getPersistentOplogs(); } PersistentOplogSet getPersistentOplogSet(DiskRegionView drv) { assert drv.isBackup(); - return persistentOplogs; + return getPersistentOplogs(); } /** @@ -695,7 +695,7 @@ public class DiskStoreImpl implements DiskStore { throws RegionClearedException { DiskRegion dr = region.getDiskRegion(); DiskId id = entry.getDiskId(); - long start = async ? this.stats.startFlush() : this.stats.startWrite(); + long start = async ? this.getStats().startFlush() : this.getStats().startWrite(); if (!async) { dr.getStats().startWrite(); } @@ -751,9 +751,9 @@ public class DiskStoreImpl implements DiskStore { } } finally { if (async) { - this.stats.endFlush(start); + this.getStats().endFlush(start); } else { - dr.getStats().endWrite(start, this.stats.endWrite(start)); + dr.getStats().endWrite(start, this.getStats().endWrite(start)); dr.getStats().incWrittenBytes(id.getValueLength()); } } @@ -1079,10 +1079,10 @@ public class DiskStoreImpl implements DiskStore { // Entry will not be found in diskRegion. // So if reference has changed, do nothing. if (!dr.didClearCountChange()) { - long start = this.stats.startRemove(); + long start = this.getStats().startRemove(); OplogSet oplogSet = getOplogSet(dr); oplogSet.remove(region, entry, async, isClear); - dr.getStats().endRemove(start, this.stats.endRemove(start)); + dr.getStats().endRemove(start, this.getStats().endRemove(start)); } else { throw new RegionClearedException( LocalizedStrings.DiskRegion_CLEAR_OPERATION_ABORTING_THE_ONGOING_ENTRY_DESTRUCTION_OPERATION_FOR_ENTRY_WITH_DISKID_0 @@ -1188,7 +1188,7 @@ public class DiskStoreImpl implements DiskStore { } public void forceRoll() { - persistentOplogs.forceRoll(null); + getPersistentOplogs().forceRoll(null); } /** @@ -1290,29 +1290,29 @@ public class DiskStoreImpl implements DiskStore { } checkForFlusherThreadTermination(); if (forceAsync) { - this.asyncQueue.forcePut(item); + this.getAsyncQueue().forcePut(item); } else { - if (!this.asyncQueue.offer(item)) { + if (!this.getAsyncQueue().offer(item)) { // queue is full so do a sync write to prevent deadlock handleFullAsyncQueue(item); // return early since we didn't add it to the queue return; } } - this.stats.incQueueSize(1); + this.getStats().incQueueSize(1); } if (this.maxAsyncItems > 0) { if (checkAsyncItemLimit()) { - synchronized (this.asyncMonitor) { - this.asyncMonitor.notifyAll(); + synchronized (this.getAsyncMonitor()) { + this.getAsyncMonitor().notifyAll(); } } } } private void rmAsyncItem(Object item) { - if (this.asyncQueue.remove(item)) { - this.stats.incQueueSize(-1); + if (this.getAsyncQueue().remove(item)) { + this.getStats().incQueueSize(-1); } } @@ -1330,12 +1330,12 @@ public class DiskStoreImpl implements DiskStore { this.pendingAsyncEnqueue.incrementAndGet(); } dr.getStats().startWrite(); - return this.stats.startWrite(); + return this.getStats().startWrite(); } private void endAsyncWrite(AsyncDiskEntry ade, DiskRegion dr, long start) { this.pendingAsyncEnqueue.decrementAndGet(); - dr.getStats().endWrite(start, this.stats.endWrite(start)); + dr.getStats().endWrite(start, this.getStats().endWrite(start)); if (!ade.versionOnly) { // for versionOnly = true ade.de will be null long bytesWritten = ade.de.getDiskId().getValueLength(); @@ -1383,14 +1383,15 @@ public class DiskStoreImpl implements DiskStore { private final Object drainSync = new Object(); private ArrayList drainList = null; - private int fillDrainList() { - synchronized (this.drainSync) { - this.drainList = new ArrayList(asyncQueue.size()); - return asyncQueue.drainTo(this.drainList); + int fillDrainList() { + synchronized (this.getDrainSync()) { + ForceableLinkedBlockingQueue<Object> queue = getAsyncQueue(); + this.drainList = new ArrayList(queue.size()); + return queue.drainTo(this.drainList); } } - private ArrayList getDrainList() { + ArrayList getDrainList() { return this.drainList; } @@ -1400,7 +1401,7 @@ public class DiskStoreImpl implements DiskStore { * clearing the isPendingAsync bit on each entry in this list. */ void clearDrainList(LocalRegion r, RegionVersionVector rvv) { - synchronized (this.drainSync) { + synchronized (this.getDrainSync()) { if (this.drainList == null) return; Iterator it = this.drainList.iterator(); @@ -1470,7 +1471,7 @@ public class DiskStoreImpl implements DiskStore { this.flusherThread = new Thread( LoggingThreadGroup.createThreadGroup( LocalizedStrings.DiskRegion_DISK_WRITERS.toLocalizedString(), logger), - new FlusherThread(), thName); + new FlusherThread(this), thName); this.flusherThread.setDaemon(true); this.flusherThread.start(); } @@ -1484,9 +1485,9 @@ public class DiskStoreImpl implements DiskStore { // See bug 41141. forceFlush(); } while (this.pendingAsyncEnqueue.get() > 0); - synchronized (asyncMonitor) { + synchronized (getAsyncMonitor()) { this.stopFlusher = true; - this.asyncMonitor.notifyAll(); + this.getAsyncMonitor().notifyAll(); } while (!this.flusherThreadTerminated) { try { @@ -1536,7 +1537,7 @@ public class DiskStoreImpl implements DiskStore { } private boolean isFlusherTerminated() { - return this.stopFlusher || this.flusherThreadTerminated || this.flusherThread == null + return this.isStopFlusher() || this.flusherThreadTerminated || this.flusherThread == null || !this.flusherThread.isAlive(); } @@ -1556,32 +1557,57 @@ public class DiskStoreImpl implements DiskStore { } private void incForceFlush() { - synchronized (this.asyncMonitor) { - this.forceFlushCount.incrementAndGet(); // moved inside sync to fix bug - // 41654 - this.asyncMonitor.notifyAll(); + Object monitor = this.getAsyncMonitor(); + synchronized (monitor) { + this.getForceFlushCount().incrementAndGet(); // moved inside sync to fix bug + // 41654 + monitor.notifyAll(); } } /** * Return true if a non-zero value is found and the decrement was done. */ - private boolean checkAndClearForceFlush() { - if (stopFlusher) { + boolean checkAndClearForceFlush() { + if (isStopFlusher()) { return true; } boolean done = false; boolean result; do { - int v = this.forceFlushCount.get(); + int v = this.getForceFlushCount().get(); result = v > 0; if (result) { - done = this.forceFlushCount.compareAndSet(v, 0); + done = this.getForceFlushCount().compareAndSet(v, 0); } } while (result && !done); return result; } + Object getAsyncMonitor() { + return asyncMonitor; + } + + AtomicInteger getForceFlushCount() { + return forceFlushCount; + } + + Object getDrainSync() { + return drainSync; + } + + ForceableLinkedBlockingQueue<Object> getAsyncQueue() { + return asyncQueue; + } + + PersistentOplogSet getPersistentOplogs() { + return persistentOplogs; + } + + boolean isStopFlusher() { + return stopFlusher; + } + private class FlushPauser extends FlushNotifier { @Override public synchronized void doFlush() { @@ -1626,43 +1652,49 @@ public class DiskStoreImpl implements DiskStore { * Return true if we have enough async items to do a flush */ private boolean checkAsyncItemLimit() { - return this.asyncQueue.size() >= this.maxAsyncItems; + return this.getAsyncQueue().size() >= this.maxAsyncItems; } - private class FlusherThread implements Runnable { + protected static class FlusherThread implements Runnable { + private DiskStoreImpl diskStore; + + public FlusherThread(DiskStoreImpl diskStore) { + this.diskStore = diskStore; + } + private boolean waitUntilFlushIsReady() throws InterruptedException { - if (maxAsyncItems > 0) { - final long time = getTimeInterval(); - synchronized (asyncMonitor) { + if (diskStore.maxAsyncItems > 0) { + final long time = diskStore.getTimeInterval(); + synchronized (diskStore.getAsyncMonitor()) { if (time > 0) { long nanosRemaining = TimeUnit.MILLISECONDS.toNanos(time); final long endTime = System.nanoTime() + nanosRemaining; - boolean done = checkAndClearForceFlush() || checkAsyncItemLimit(); + boolean done = diskStore.checkAndClearForceFlush() || diskStore.checkAsyncItemLimit(); while (!done && nanosRemaining > 0) { - TimeUnit.NANOSECONDS.timedWait(asyncMonitor, nanosRemaining); - done = checkAndClearForceFlush() || checkAsyncItemLimit(); + TimeUnit.NANOSECONDS.timedWait(diskStore.getAsyncMonitor(), nanosRemaining); + done = diskStore.checkAndClearForceFlush() || diskStore.checkAsyncItemLimit(); if (!done) { nanosRemaining = endTime - System.nanoTime(); } } } else { - boolean done = checkAndClearForceFlush() || checkAsyncItemLimit(); + boolean done = diskStore.checkAndClearForceFlush() || diskStore.checkAsyncItemLimit(); while (!done) { - asyncMonitor.wait(); - done = checkAndClearForceFlush() || checkAsyncItemLimit(); + diskStore.getAsyncMonitor().wait(); + done = diskStore.checkAndClearForceFlush() || diskStore.checkAsyncItemLimit(); } } } } else { - long time = getTimeInterval(); + long time = diskStore.getTimeInterval(); if (time > 0) { long nanosRemaining = TimeUnit.MILLISECONDS.toNanos(time); final long endTime = System.nanoTime() + nanosRemaining; - synchronized (asyncMonitor) { - boolean done = checkAndClearForceFlush(); + synchronized (diskStore.getAsyncMonitor()) { + boolean done = diskStore.checkAndClearForceFlush(); while (!done && nanosRemaining > 0) { - TimeUnit.NANOSECONDS.timedWait(asyncMonitor, nanosRemaining); - done = checkAndClearForceFlush(); + TimeUnit.NANOSECONDS.timedWait(diskStore.getAsyncMonitor(), nanosRemaining); + done = diskStore.checkAndClearForceFlush(); if (!done) { nanosRemaining = endTime - System.nanoTime(); } @@ -1670,23 +1702,27 @@ public class DiskStoreImpl implements DiskStore { } } else { // wait for a forceFlush - synchronized (asyncMonitor) { - boolean done = checkAndClearForceFlush(); + synchronized (diskStore.getAsyncMonitor()) { + boolean done = diskStore.checkAndClearForceFlush(); while (!done) { - asyncMonitor.wait(); - done = checkAndClearForceFlush(); + diskStore.getAsyncMonitor().wait(); + done = diskStore.checkAndClearForceFlush(); } } } } - return !stopFlusher; + return !diskStore.isStopFlusher(); } private void flushChild() { - persistentOplogs.flushChild(); + diskStore.getPersistentOplogs().flushChild(); } public void run() { + doAsyncFlush(); + } + + void doAsyncFlush() { DiskAccessException fatalDae = null; if (logger.isDebugEnabled()) { logger.debug("Async writer thread started"); @@ -1694,10 +1730,9 @@ public class DiskStoreImpl implements DiskStore { boolean doingFlush = false; try { while (waitUntilFlushIsReady()) { - int drainCount = fillDrainList(); + int drainCount = diskStore.fillDrainList(); if (drainCount > 0) { - stats.incQueueSize(-drainCount); - Iterator it = getDrainList().iterator(); + Iterator it = diskStore.getDrainList().iterator(); while (it.hasNext()) { Object o = it.next(); if (o instanceof FlushNotifier) { @@ -1759,16 +1794,17 @@ public class DiskStoreImpl implements DiskStore { CacheObserverHolder.getInstance().afterWritingBytes(); } } + diskStore.getStats().incQueueSize(-drainCount); } } } catch (InterruptedException ie) { flushChild(); Thread.currentThread().interrupt(); - getCache().getCancelCriterion().checkCancelInProgress(ie); + diskStore.getCache().getCancelCriterion().checkCancelInProgress(ie); throw new IllegalStateException("Async writer thread stopping due to unexpected interrupt"); } catch (DiskAccessException dae) { boolean okToIgnore = dae.getCause() instanceof ClosedByInterruptException; - if (!okToIgnore || !stopFlusher) { + if (!okToIgnore || !diskStore.isStopFlusher()) { fatalDae = dae; } } catch (CancelException ignore) { @@ -1776,17 +1812,17 @@ public class DiskStoreImpl implements DiskStore { logger.fatal(LocalizedMessage.create(LocalizedStrings.DiskStoreImpl_FATAL_ERROR_ON_FLUSH), t); fatalDae = new DiskAccessException( - LocalizedStrings.DiskStoreImpl_FATAL_ERROR_ON_FLUSH.toLocalizedString(), t, - DiskStoreImpl.this); + LocalizedStrings.DiskStoreImpl_FATAL_ERROR_ON_FLUSH.toLocalizedString(), t, diskStore); } finally { if (logger.isDebugEnabled()) { - logger.debug("Async writer thread stopped. Pending opcount={}", asyncQueue.size()); + logger.debug("Async writer thread stopped. Pending opcount={}", + diskStore.getAsyncQueue().size()); } - flusherThreadTerminated = true; - stopFlusher = true; // set this before calling handleDiskAccessException + diskStore.flusherThreadTerminated = true; + diskStore.stopFlusher = true; // set this before calling handleDiskAccessException // or it will hang if (fatalDae != null) { - handleDiskAccessException(fatalDae); + diskStore.handleDiskAccessException(fatalDae); } } } @@ -1925,7 +1961,7 @@ public class DiskStoreImpl implements DiskStore { boolean finished = false; try { Map<File, DirectoryHolder> persistentBackupFiles = - persistentOplogs.findFiles(partialFileName); + getPersistentOplogs().findFiles(partialFileName); { boolean backupFilesExist = !persistentBackupFiles.isEmpty(); @@ -1964,7 +2000,7 @@ public class DiskStoreImpl implements DiskStore { cleanupOrphanedBackupDirectories(); - persistentOplogs.createOplogs(needsOplogs, persistentBackupFiles); + getPersistentOplogs().createOplogs(needsOplogs, persistentBackupFiles); finished = true; // Log a message with the disk store id, indicating whether we recovered @@ -2017,7 +2053,7 @@ public class DiskStoreImpl implements DiskStore { * removed from the stats . */ private void statsClose() { - this.stats.close(); + this.getStats().close(); if (this.directories != null) { for (final DirectoryHolder directory : this.directories) { directory.close(); @@ -2026,7 +2062,7 @@ public class DiskStoreImpl implements DiskStore { } void initializeIfNeeded() { - if (!persistentOplogs.alreadyRecoveredOnce.get()) { + if (!getPersistentOplogs().alreadyRecoveredOnce.get()) { recoverRegionsThatAreReady(); } } @@ -2039,7 +2075,7 @@ public class DiskStoreImpl implements DiskStore { * Reads the oplogs files and loads them into regions that are ready to be recovered. */ public void recoverRegionsThatAreReady() { - persistentOplogs.recoverRegionsThatAreReady(); + getPersistentOplogs().recoverRegionsThatAreReady(); } void scheduleValueRecovery(Set<Oplog> oplogsNeedingValueRecovery, @@ -2157,7 +2193,7 @@ public class DiskStoreImpl implements DiskStore { try { // Now while holding the write lock remove any elements from the queue // for this region. - for (final Object o : this.asyncQueue) { + for (final Object o : this.getAsyncQueue()) { if (o instanceof AsyncDiskEntry) { AsyncDiskEntry ade = (AsyncDiskEntry) o; if (shouldClear(region, rvv, ade)) { @@ -2311,7 +2347,7 @@ public class DiskStoreImpl implements DiskStore { } if ((!destroy && getDiskInitFile().hasLiveRegions()) || isValidating()) { - RuntimeException exception = persistentOplogs.close(); + RuntimeException exception = getPersistentOplogs().close(); if (exception != null && rte != null) { rte = exception; } @@ -2430,7 +2466,7 @@ public class DiskStoreImpl implements DiskStore { public void prepareForClose() { forceFlush(); - persistentOplogs.prepareForClose(); + getPersistentOplogs().prepareForClose(); closeCompactor(true); } @@ -2586,7 +2622,7 @@ public class DiskStoreImpl implements DiskStore { } boolean basicForceCompaction(DiskRegion dr) { - PersistentOplogSet oplogSet = persistentOplogs; + PersistentOplogSet oplogSet = getPersistentOplogs(); // see if the current active oplog is compactable; if so { Oplog active = oplogSet.getChild(); @@ -2637,7 +2673,7 @@ public class DiskStoreImpl implements DiskStore { * */ private void destroyAllOplogs() { - persistentOplogs.destroyAllOplogs(); + getPersistentOplogs().destroyAllOplogs(); // Need to also remove all oplogs that logically belong to this DiskStore // even if we were not using them. @@ -2711,7 +2747,7 @@ public class DiskStoreImpl implements DiskStore { if (!all && max > MAX_OPLOGS_PER_COMPACTION && MAX_OPLOGS_PER_COMPACTION > 0) { max = MAX_OPLOGS_PER_COMPACTION; } - persistentOplogs.getCompactableOplogs(l, max); + getPersistentOplogs().getCompactableOplogs(l, max); // Note this always puts overflow oplogs on the end of the list. // They may get starved. @@ -2728,7 +2764,7 @@ public class DiskStoreImpl implements DiskStore { * Get all of the oplogs */ public Oplog[] getAllOplogsForBackup() { - return persistentOplogs.getAllOplogs(); + return getPersistentOplogs().getAllOplogs(); } // @todo perhaps a better thing for the tests would be to give them a listener @@ -3923,8 +3959,8 @@ public class DiskStoreImpl implements DiskStore { scheduleForRecovery(OfflineCompactionDiskRegion.create(this, drv)); } - persistentOplogs.recoverRegionsThatAreReady(); - persistentOplogs.offlineCompact(); + getPersistentOplogs().recoverRegionsThatAreReady(); + getPersistentOplogs().offlineCompact(); // TODO soplogs - we need to do offline compaction for // the soplog regions, but that is not currently implemented @@ -4238,7 +4274,7 @@ public class DiskStoreImpl implements DiskStore { } public boolean hasPersistedData() { - return persistentOplogs.getChild() != null; + return getPersistentOplogs().getChild() != null; } public UUID getDiskStoreUUID() { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java index 3c82ad3..82694c6 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java @@ -21,7 +21,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -32,11 +35,15 @@ import org.junit.rules.TemporaryFolder; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DiskStore; +import org.apache.geode.cache.DiskStoreFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.distributed.ConfigurationProperties; import org.apache.geode.internal.cache.backup.BackupService; import org.apache.geode.test.junit.categories.IntegrationTest; + @Category(IntegrationTest.class) public class DiskStoreImplIntegrationTest { private static final String DISK_STORE_NAME = "testDiskStore"; @@ -46,6 +53,8 @@ public class DiskStoreImplIntegrationTest { public TemporaryFolder temporaryDirectory = new TemporaryFolder(); private Cache cache; + private Region aRegion; + private DiskStoreStats diskStoreStats; @Before public void setup() { @@ -80,12 +89,53 @@ public class DiskStoreImplIntegrationTest { tempDirs.forEach(tempDir -> assertThat(Files.exists(tempDir)).isFalse()); } + @Test + public void queueSizeStatIncrementedAfterAsyncFlush() throws Exception { + File baseDir = temporaryDirectory.newFolder(); + final int QUEUE_SIZE = 50; + createRegionWithDiskStoreAndAsyncQueue(baseDir, QUEUE_SIZE); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> diskStoreStats.getQueueSize() == 0); + + putEntries(QUEUE_SIZE - 1); + Awaitility.await().atMost(1, TimeUnit.MINUTES) + .until(() -> diskStoreStats.getQueueSize() == QUEUE_SIZE - 1); + + putEntries(1); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> diskStoreStats.getQueueSize() == 0); + } + + private void putEntries(int numToPut) { + for (int i = 1; i <= numToPut; i++) { + aRegion.put(i, i); + } + } + private void createRegionWithDiskStore(File baseDir) { cache.createDiskStoreFactory().setDiskDirs(new File[] {baseDir}).create(DISK_STORE_NAME); cache.<String, String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT) .setDiskStoreName(DISK_STORE_NAME).create(REGION_NAME); } + private void createRegionWithDiskStoreAndAsyncQueue(File baseDir, int queueSize) { + createDiskStoreWithQueue(baseDir, queueSize); + + RegionFactory regionFactory = + cache.<String, String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT); + regionFactory.setDiskSynchronous(false); + regionFactory.setDiskStoreName(DISK_STORE_NAME); + aRegion = regionFactory.create(REGION_NAME); + } + + private void createDiskStoreWithQueue(File baseDir, int queueSize) { + final int TIME_INTERVAL = 300000; + DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory(); + diskStoreFactory.setDiskDirs(new File[] {baseDir}); + diskStoreFactory.setQueueSize(queueSize); + diskStoreFactory.setTimeInterval(TIME_INTERVAL); + DiskStore diskStore = diskStoreFactory.create(DISK_STORE_NAME); + diskStoreStats = ((DiskStoreImpl) diskStore).getStats(); + } + private Cache createCache() { // Setting MCAST port explicitly is currently required due to default properties set in gradle return new CacheFactory().set(ConfigurationProperties.MCAST_PORT, "0").create(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java new file mode 100644 index 0000000..7acffae --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.DiskAccessException; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class FlusherThreadTest { + + private DiskStoreImpl diskStoreImpl; + private DiskStoreStats diskStoreStats; + private final int DRAIN_LIST_SIZE = 5; + + @Before + public void setup() { + diskStoreImpl = mock(DiskStoreImpl.class); + diskStoreStats = mock(DiskStoreStats.class); + PersistentOplogSet persistentOpLogSet = mock(PersistentOplogSet.class); + + when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object()); + when(diskStoreImpl.getForceFlushCount()).thenReturn(new AtomicInteger(1)); + when(diskStoreImpl.fillDrainList()).thenReturn(DRAIN_LIST_SIZE).thenReturn(0); + when(diskStoreImpl.getDrainList()).thenReturn(new ArrayList()); + when(diskStoreImpl.getPersistentOplogs()).thenReturn(persistentOpLogSet); + when(diskStoreImpl.getStats()).thenReturn(diskStoreStats); + when(diskStoreImpl.checkAndClearForceFlush()).thenReturn(true); + when(diskStoreImpl.isStopFlusher()).thenReturn(false).thenReturn(true); + } + + @Test + public void asyncFlushIncrementsQueueSizeStat() { + DiskStoreImpl.FlusherThread flusherThread = new DiskStoreImpl.FlusherThread(diskStoreImpl); + + flusherThread.doAsyncFlush(); + + verify(diskStoreStats, times(1)).incQueueSize(-DRAIN_LIST_SIZE); + } + + @Test + public void asyncFlushDoesNotIncrementQueueSizeWhenExceptionThrown() { + DiskStoreImpl.FlusherThread flusherThread = new DiskStoreImpl.FlusherThread(diskStoreImpl); + when(diskStoreImpl.getDrainList()).thenThrow(DiskAccessException.class); + + flusherThread.doAsyncFlush(); + + verify(diskStoreStats, never()).incQueueSize(anyInt()); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java index 16df420..a7f1e27 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java @@ -152,7 +152,7 @@ public class OplogFlushTest extends DiskRegionTestingBase { public void testAsyncChannelWriteRetriesOnFailureDuringFlush() throws Exception { region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, null); DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs(); assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); assertNotNull("Unexpected null Oplog", oplogs[0]); @@ -163,7 +163,7 @@ public class OplogFlushTest extends DiskRegionTestingBase { public void testChannelWriteRetriesOnFailureDuringFlush() throws Exception { region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs(); assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); assertNotNull("Unexpected null Oplog", oplogs[0]); @@ -174,7 +174,7 @@ public class OplogFlushTest extends DiskRegionTestingBase { public void testChannelRecoversFromWriteFailureRepeatedRetriesDuringFlush() throws Exception { region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs(); assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); assertNotNull("Unexpected null Oplog", oplogs[0]); @@ -188,7 +188,7 @@ public class OplogFlushTest extends DiskRegionTestingBase { expectedException.expectCause(instanceOf(IOException.class)); region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, null); DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs(); assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); assertNotNull("Unexpected null Oplog", oplogs[0]); @@ -225,7 +225,7 @@ public class OplogFlushTest extends DiskRegionTestingBase { public void testOplogByteArrayFlush() throws Exception { region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, Scope.LOCAL); DiskRegion dr = ((LocalRegion) region).getDiskRegion(); - Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs(); + Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs(); assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs); assertNotNull("Unexpected null Oplog", oplogs[0]); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java index b259277..1b920da 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java @@ -420,7 +420,7 @@ public class OplogJUnitTest extends DiskRegionTestingBase { region.put(2, val); Oplog switched = dr.testHook_getChild(); assertTrue(old != switched); - assertEquals(dr.getDiskStore().persistentOplogs.getChild(2), switched); + assertEquals(dr.getDiskStore().getPersistentOplogs().getChild(2), switched); assertEquals(oldWriteBuf, switched.getWriteBuf()); assertEquals(null, old.getWriteBuf()); closeDown(); @@ -1925,7 +1925,7 @@ public class OplogJUnitTest extends DiskRegionTestingBase { private long oplogSize() { long size = ((LocalRegion) region).getDiskRegion().getDiskStore().undeletedOplogSize.get(); Oplog[] opArray = - ((LocalRegion) region).getDiskRegion().getDiskStore().persistentOplogs.getAllOplogs(); + ((LocalRegion) region).getDiskRegion().getDiskStore().getPersistentOplogs().getAllOplogs(); if (opArray != null) { for (Oplog log : opArray) { size += log.getOplogSize(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java index c19a728..d38304a 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java @@ -245,7 +245,7 @@ public class SimpleDiskRegionJUnitTest extends DiskRegionTestingBase { StatisticsFactory factory = region.getCache().getDistributedSystem(); Oplog newOplog = new Oplog(id, dr.getOplogSet(), new DirectoryHolder(factory, dirs[0], 1000000, 0)); - dr.getDiskStore().persistentOplogs.setChild(newOplog); + dr.getDiskStore().getPersistentOplogs().setChild(newOplog); assertEquals(newOplog, dr.testHook_getChild()); dr.setChild(oplog); assertEquals(oplog, dr.testHook_getChild()); -- To stop receiving notification emails like this one, please contact lgalli...@apache.org.