This is an automated email from the ASF dual-hosted git repository.

lgallinat pushed a commit to branch feature/GEODE-4435
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 43b395f8764df28abe19ff6d1182beee5806f69f
Author: Lynn Gallinat <lgalli...@pivotal.io>
AuthorDate: Thu Feb 1 10:02:43 2018 -0800

    GEODE-4435 Move queueSize stat increment to after writing to disk.
---
 .../apache/geode/internal/cache/DiskRegion.java    |   2 +-
 .../apache/geode/internal/cache/DiskStoreImpl.java | 206 ++++++++++++---------
 .../cache/DiskStoreImplIntegrationTest.java        |  50 +++++
 .../geode/internal/cache/FlusherThreadTest.java    |  76 ++++++++
 .../geode/internal/cache/OplogFlushTest.java       |  10 +-
 .../geode/internal/cache/OplogJUnitTest.java       |   4 +-
 .../internal/cache/SimpleDiskRegionJUnitTest.java  |   2 +-
 7 files changed, 256 insertions(+), 94 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java
index 161c583..18d6072 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java
@@ -535,7 +535,7 @@ public class DiskRegion extends AbstractDiskRegion {
    * returns the active child
    */
   Oplog testHook_getChild() {
-    return getDiskStore().persistentOplogs.getChild();
+    return getDiskStore().getPersistentOplogs().getChild();
   }
 
   /** For Testing * */
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index f0959b8..724ed54 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -320,7 +320,7 @@ public class DiskStoreImpl implements DiskStore {
   private final AtomicReference<DiskAccessException> diskException =
       new AtomicReference<DiskAccessException>();
 
-  PersistentOplogSet persistentOplogs = new PersistentOplogSet(this);
+  private PersistentOplogSet persistentOplogs = new PersistentOplogSet(this);
 
   OverflowOplogSet overflowOplogs = new OverflowOplogSet(this);
 
@@ -667,19 +667,19 @@ public class DiskStoreImpl implements DiskStore {
 
   private OplogSet getOplogSet(DiskRegionView drv) {
     if (drv.isBackup()) {
-      return persistentOplogs;
+      return getPersistentOplogs();
     } else {
       return overflowOplogs;
     }
   }
 
   public PersistentOplogSet getPersistentOplogSet() {
-    return persistentOplogs;
+    return getPersistentOplogs();
   }
 
   PersistentOplogSet getPersistentOplogSet(DiskRegionView drv) {
     assert drv.isBackup();
-    return persistentOplogs;
+    return getPersistentOplogs();
   }
 
   /**
@@ -695,7 +695,7 @@ public class DiskStoreImpl implements DiskStore {
       throws RegionClearedException {
     DiskRegion dr = region.getDiskRegion();
     DiskId id = entry.getDiskId();
-    long start = async ? this.stats.startFlush() : this.stats.startWrite();
+    long start = async ? this.getStats().startFlush() : 
this.getStats().startWrite();
     if (!async) {
       dr.getStats().startWrite();
     }
@@ -751,9 +751,9 @@ public class DiskStoreImpl implements DiskStore {
       }
     } finally {
       if (async) {
-        this.stats.endFlush(start);
+        this.getStats().endFlush(start);
       } else {
-        dr.getStats().endWrite(start, this.stats.endWrite(start));
+        dr.getStats().endWrite(start, this.getStats().endWrite(start));
         dr.getStats().incWrittenBytes(id.getValueLength());
       }
     }
@@ -1079,10 +1079,10 @@ public class DiskStoreImpl implements DiskStore {
       // Entry will not be found in diskRegion.
       // So if reference has changed, do nothing.
       if (!dr.didClearCountChange()) {
-        long start = this.stats.startRemove();
+        long start = this.getStats().startRemove();
         OplogSet oplogSet = getOplogSet(dr);
         oplogSet.remove(region, entry, async, isClear);
-        dr.getStats().endRemove(start, this.stats.endRemove(start));
+        dr.getStats().endRemove(start, this.getStats().endRemove(start));
       } else {
         throw new RegionClearedException(
             
LocalizedStrings.DiskRegion_CLEAR_OPERATION_ABORTING_THE_ONGOING_ENTRY_DESTRUCTION_OPERATION_FOR_ENTRY_WITH_DISKID_0
@@ -1188,7 +1188,7 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   public void forceRoll() {
-    persistentOplogs.forceRoll(null);
+    getPersistentOplogs().forceRoll(null);
   }
 
   /**
@@ -1290,29 +1290,29 @@ public class DiskStoreImpl implements DiskStore {
       }
       checkForFlusherThreadTermination();
       if (forceAsync) {
-        this.asyncQueue.forcePut(item);
+        this.getAsyncQueue().forcePut(item);
       } else {
-        if (!this.asyncQueue.offer(item)) {
+        if (!this.getAsyncQueue().offer(item)) {
           // queue is full so do a sync write to prevent deadlock
           handleFullAsyncQueue(item);
           // return early since we didn't add it to the queue
           return;
         }
       }
-      this.stats.incQueueSize(1);
+      this.getStats().incQueueSize(1);
     }
     if (this.maxAsyncItems > 0) {
       if (checkAsyncItemLimit()) {
-        synchronized (this.asyncMonitor) {
-          this.asyncMonitor.notifyAll();
+        synchronized (this.getAsyncMonitor()) {
+          this.getAsyncMonitor().notifyAll();
         }
       }
     }
   }
 
   private void rmAsyncItem(Object item) {
-    if (this.asyncQueue.remove(item)) {
-      this.stats.incQueueSize(-1);
+    if (this.getAsyncQueue().remove(item)) {
+      this.getStats().incQueueSize(-1);
     }
   }
 
@@ -1330,12 +1330,12 @@ public class DiskStoreImpl implements DiskStore {
       this.pendingAsyncEnqueue.incrementAndGet();
     }
     dr.getStats().startWrite();
-    return this.stats.startWrite();
+    return this.getStats().startWrite();
   }
 
   private void endAsyncWrite(AsyncDiskEntry ade, DiskRegion dr, long start) {
     this.pendingAsyncEnqueue.decrementAndGet();
-    dr.getStats().endWrite(start, this.stats.endWrite(start));
+    dr.getStats().endWrite(start, this.getStats().endWrite(start));
 
     if (!ade.versionOnly) { // for versionOnly = true ade.de will be null
       long bytesWritten = ade.de.getDiskId().getValueLength();
@@ -1383,14 +1383,15 @@ public class DiskStoreImpl implements DiskStore {
   private final Object drainSync = new Object();
   private ArrayList drainList = null;
 
-  private int fillDrainList() {
-    synchronized (this.drainSync) {
-      this.drainList = new ArrayList(asyncQueue.size());
-      return asyncQueue.drainTo(this.drainList);
+  int fillDrainList() {
+    synchronized (this.getDrainSync()) {
+      ForceableLinkedBlockingQueue<Object> queue = getAsyncQueue();
+      this.drainList = new ArrayList(queue.size());
+      return queue.drainTo(this.drainList);
     }
   }
 
-  private ArrayList getDrainList() {
+  ArrayList getDrainList() {
     return this.drainList;
   }
 
@@ -1400,7 +1401,7 @@ public class DiskStoreImpl implements DiskStore {
    * clearing the isPendingAsync bit on each entry in this list.
    */
   void clearDrainList(LocalRegion r, RegionVersionVector rvv) {
-    synchronized (this.drainSync) {
+    synchronized (this.getDrainSync()) {
       if (this.drainList == null)
         return;
       Iterator it = this.drainList.iterator();
@@ -1470,7 +1471,7 @@ public class DiskStoreImpl implements DiskStore {
     this.flusherThread = new Thread(
         LoggingThreadGroup.createThreadGroup(
             LocalizedStrings.DiskRegion_DISK_WRITERS.toLocalizedString(), 
logger),
-        new FlusherThread(), thName);
+        new FlusherThread(this), thName);
     this.flusherThread.setDaemon(true);
     this.flusherThread.start();
   }
@@ -1484,9 +1485,9 @@ public class DiskStoreImpl implements DiskStore {
       // See bug 41141.
       forceFlush();
     } while (this.pendingAsyncEnqueue.get() > 0);
-    synchronized (asyncMonitor) {
+    synchronized (getAsyncMonitor()) {
       this.stopFlusher = true;
-      this.asyncMonitor.notifyAll();
+      this.getAsyncMonitor().notifyAll();
     }
     while (!this.flusherThreadTerminated) {
       try {
@@ -1536,7 +1537,7 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   private boolean isFlusherTerminated() {
-    return this.stopFlusher || this.flusherThreadTerminated || 
this.flusherThread == null
+    return this.isStopFlusher() || this.flusherThreadTerminated || 
this.flusherThread == null
         || !this.flusherThread.isAlive();
   }
 
@@ -1556,32 +1557,57 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   private void incForceFlush() {
-    synchronized (this.asyncMonitor) {
-      this.forceFlushCount.incrementAndGet(); // moved inside sync to fix bug
-                                              // 41654
-      this.asyncMonitor.notifyAll();
+    Object monitor = this.getAsyncMonitor();
+    synchronized (monitor) {
+      this.getForceFlushCount().incrementAndGet(); // moved inside sync to fix 
bug
+      // 41654
+      monitor.notifyAll();
     }
   }
 
   /**
    * Return true if a non-zero value is found and the decrement was done.
    */
-  private boolean checkAndClearForceFlush() {
-    if (stopFlusher) {
+  boolean checkAndClearForceFlush() {
+    if (isStopFlusher()) {
       return true;
     }
     boolean done = false;
     boolean result;
     do {
-      int v = this.forceFlushCount.get();
+      int v = this.getForceFlushCount().get();
       result = v > 0;
       if (result) {
-        done = this.forceFlushCount.compareAndSet(v, 0);
+        done = this.getForceFlushCount().compareAndSet(v, 0);
       }
     } while (result && !done);
     return result;
   }
 
+  Object getAsyncMonitor() {
+    return asyncMonitor;
+  }
+
+  AtomicInteger getForceFlushCount() {
+    return forceFlushCount;
+  }
+
+  Object getDrainSync() {
+    return drainSync;
+  }
+
+  ForceableLinkedBlockingQueue<Object> getAsyncQueue() {
+    return asyncQueue;
+  }
+
+  PersistentOplogSet getPersistentOplogs() {
+    return persistentOplogs;
+  }
+
+  boolean isStopFlusher() {
+    return stopFlusher;
+  }
+
   private class FlushPauser extends FlushNotifier {
     @Override
     public synchronized void doFlush() {
@@ -1626,43 +1652,49 @@ public class DiskStoreImpl implements DiskStore {
    * Return true if we have enough async items to do a flush
    */
   private boolean checkAsyncItemLimit() {
-    return this.asyncQueue.size() >= this.maxAsyncItems;
+    return this.getAsyncQueue().size() >= this.maxAsyncItems;
   }
 
-  private class FlusherThread implements Runnable {
+  protected static class FlusherThread implements Runnable {
+    private DiskStoreImpl diskStore;
+
+    public FlusherThread(DiskStoreImpl diskStore) {
+      this.diskStore = diskStore;
+    }
+
     private boolean waitUntilFlushIsReady() throws InterruptedException {
-      if (maxAsyncItems > 0) {
-        final long time = getTimeInterval();
-        synchronized (asyncMonitor) {
+      if (diskStore.maxAsyncItems > 0) {
+        final long time = diskStore.getTimeInterval();
+        synchronized (diskStore.getAsyncMonitor()) {
           if (time > 0) {
             long nanosRemaining = TimeUnit.MILLISECONDS.toNanos(time);
             final long endTime = System.nanoTime() + nanosRemaining;
-            boolean done = checkAndClearForceFlush() || checkAsyncItemLimit();
+            boolean done = diskStore.checkAndClearForceFlush() || 
diskStore.checkAsyncItemLimit();
             while (!done && nanosRemaining > 0) {
-              TimeUnit.NANOSECONDS.timedWait(asyncMonitor, nanosRemaining);
-              done = checkAndClearForceFlush() || checkAsyncItemLimit();
+              TimeUnit.NANOSECONDS.timedWait(diskStore.getAsyncMonitor(), 
nanosRemaining);
+              done = diskStore.checkAndClearForceFlush() || 
diskStore.checkAsyncItemLimit();
               if (!done) {
                 nanosRemaining = endTime - System.nanoTime();
               }
             }
           } else {
-            boolean done = checkAndClearForceFlush() || checkAsyncItemLimit();
+            boolean done = diskStore.checkAndClearForceFlush() || 
diskStore.checkAsyncItemLimit();
             while (!done) {
-              asyncMonitor.wait();
-              done = checkAndClearForceFlush() || checkAsyncItemLimit();
+              diskStore.getAsyncMonitor().wait();
+              done = diskStore.checkAndClearForceFlush() || 
diskStore.checkAsyncItemLimit();
             }
           }
         }
       } else {
-        long time = getTimeInterval();
+        long time = diskStore.getTimeInterval();
         if (time > 0) {
           long nanosRemaining = TimeUnit.MILLISECONDS.toNanos(time);
           final long endTime = System.nanoTime() + nanosRemaining;
-          synchronized (asyncMonitor) {
-            boolean done = checkAndClearForceFlush();
+          synchronized (diskStore.getAsyncMonitor()) {
+            boolean done = diskStore.checkAndClearForceFlush();
             while (!done && nanosRemaining > 0) {
-              TimeUnit.NANOSECONDS.timedWait(asyncMonitor, nanosRemaining);
-              done = checkAndClearForceFlush();
+              TimeUnit.NANOSECONDS.timedWait(diskStore.getAsyncMonitor(), 
nanosRemaining);
+              done = diskStore.checkAndClearForceFlush();
               if (!done) {
                 nanosRemaining = endTime - System.nanoTime();
               }
@@ -1670,23 +1702,27 @@ public class DiskStoreImpl implements DiskStore {
           }
         } else {
           // wait for a forceFlush
-          synchronized (asyncMonitor) {
-            boolean done = checkAndClearForceFlush();
+          synchronized (diskStore.getAsyncMonitor()) {
+            boolean done = diskStore.checkAndClearForceFlush();
             while (!done) {
-              asyncMonitor.wait();
-              done = checkAndClearForceFlush();
+              diskStore.getAsyncMonitor().wait();
+              done = diskStore.checkAndClearForceFlush();
             }
           }
         }
       }
-      return !stopFlusher;
+      return !diskStore.isStopFlusher();
     }
 
     private void flushChild() {
-      persistentOplogs.flushChild();
+      diskStore.getPersistentOplogs().flushChild();
     }
 
     public void run() {
+      doAsyncFlush();
+    }
+
+    void doAsyncFlush() {
       DiskAccessException fatalDae = null;
       if (logger.isDebugEnabled()) {
         logger.debug("Async writer thread started");
@@ -1694,10 +1730,9 @@ public class DiskStoreImpl implements DiskStore {
       boolean doingFlush = false;
       try {
         while (waitUntilFlushIsReady()) {
-          int drainCount = fillDrainList();
+          int drainCount = diskStore.fillDrainList();
           if (drainCount > 0) {
-            stats.incQueueSize(-drainCount);
-            Iterator it = getDrainList().iterator();
+            Iterator it = diskStore.getDrainList().iterator();
             while (it.hasNext()) {
               Object o = it.next();
               if (o instanceof FlushNotifier) {
@@ -1759,16 +1794,17 @@ public class DiskStoreImpl implements DiskStore {
                 CacheObserverHolder.getInstance().afterWritingBytes();
               }
             }
+            diskStore.getStats().incQueueSize(-drainCount);
           }
         }
       } catch (InterruptedException ie) {
         flushChild();
         Thread.currentThread().interrupt();
-        getCache().getCancelCriterion().checkCancelInProgress(ie);
+        diskStore.getCache().getCancelCriterion().checkCancelInProgress(ie);
         throw new IllegalStateException("Async writer thread stopping due to 
unexpected interrupt");
       } catch (DiskAccessException dae) {
         boolean okToIgnore = dae.getCause() instanceof 
ClosedByInterruptException;
-        if (!okToIgnore || !stopFlusher) {
+        if (!okToIgnore || !diskStore.isStopFlusher()) {
           fatalDae = dae;
         }
       } catch (CancelException ignore) {
@@ -1776,17 +1812,17 @@ public class DiskStoreImpl implements DiskStore {
         
logger.fatal(LocalizedMessage.create(LocalizedStrings.DiskStoreImpl_FATAL_ERROR_ON_FLUSH),
             t);
         fatalDae = new DiskAccessException(
-            
LocalizedStrings.DiskStoreImpl_FATAL_ERROR_ON_FLUSH.toLocalizedString(), t,
-            DiskStoreImpl.this);
+            
LocalizedStrings.DiskStoreImpl_FATAL_ERROR_ON_FLUSH.toLocalizedString(), t, 
diskStore);
       } finally {
         if (logger.isDebugEnabled()) {
-          logger.debug("Async writer thread stopped. Pending opcount={}", 
asyncQueue.size());
+          logger.debug("Async writer thread stopped. Pending opcount={}",
+              diskStore.getAsyncQueue().size());
         }
-        flusherThreadTerminated = true;
-        stopFlusher = true; // set this before calling 
handleDiskAccessException
+        diskStore.flusherThreadTerminated = true;
+        diskStore.stopFlusher = true; // set this before calling 
handleDiskAccessException
         // or it will hang
         if (fatalDae != null) {
-          handleDiskAccessException(fatalDae);
+          diskStore.handleDiskAccessException(fatalDae);
         }
       }
     }
@@ -1925,7 +1961,7 @@ public class DiskStoreImpl implements DiskStore {
     boolean finished = false;
     try {
       Map<File, DirectoryHolder> persistentBackupFiles =
-          persistentOplogs.findFiles(partialFileName);
+          getPersistentOplogs().findFiles(partialFileName);
       {
 
         boolean backupFilesExist = !persistentBackupFiles.isEmpty();
@@ -1964,7 +2000,7 @@ public class DiskStoreImpl implements DiskStore {
 
       cleanupOrphanedBackupDirectories();
 
-      persistentOplogs.createOplogs(needsOplogs, persistentBackupFiles);
+      getPersistentOplogs().createOplogs(needsOplogs, persistentBackupFiles);
       finished = true;
 
       // Log a message with the disk store id, indicating whether we recovered
@@ -2017,7 +2053,7 @@ public class DiskStoreImpl implements DiskStore {
    * removed from the stats .
    */
   private void statsClose() {
-    this.stats.close();
+    this.getStats().close();
     if (this.directories != null) {
       for (final DirectoryHolder directory : this.directories) {
         directory.close();
@@ -2026,7 +2062,7 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   void initializeIfNeeded() {
-    if (!persistentOplogs.alreadyRecoveredOnce.get()) {
+    if (!getPersistentOplogs().alreadyRecoveredOnce.get()) {
       recoverRegionsThatAreReady();
     }
   }
@@ -2039,7 +2075,7 @@ public class DiskStoreImpl implements DiskStore {
    * Reads the oplogs files and loads them into regions that are ready to be 
recovered.
    */
   public void recoverRegionsThatAreReady() {
-    persistentOplogs.recoverRegionsThatAreReady();
+    getPersistentOplogs().recoverRegionsThatAreReady();
   }
 
   void scheduleValueRecovery(Set<Oplog> oplogsNeedingValueRecovery,
@@ -2157,7 +2193,7 @@ public class DiskStoreImpl implements DiskStore {
     try {
       // Now while holding the write lock remove any elements from the queue
       // for this region.
-      for (final Object o : this.asyncQueue) {
+      for (final Object o : this.getAsyncQueue()) {
         if (o instanceof AsyncDiskEntry) {
           AsyncDiskEntry ade = (AsyncDiskEntry) o;
           if (shouldClear(region, rvv, ade)) {
@@ -2311,7 +2347,7 @@ public class DiskStoreImpl implements DiskStore {
       }
 
       if ((!destroy && getDiskInitFile().hasLiveRegions()) || isValidating()) {
-        RuntimeException exception = persistentOplogs.close();
+        RuntimeException exception = getPersistentOplogs().close();
         if (exception != null && rte != null) {
           rte = exception;
         }
@@ -2430,7 +2466,7 @@ public class DiskStoreImpl implements DiskStore {
 
   public void prepareForClose() {
     forceFlush();
-    persistentOplogs.prepareForClose();
+    getPersistentOplogs().prepareForClose();
     closeCompactor(true);
   }
 
@@ -2586,7 +2622,7 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   boolean basicForceCompaction(DiskRegion dr) {
-    PersistentOplogSet oplogSet = persistentOplogs;
+    PersistentOplogSet oplogSet = getPersistentOplogs();
     // see if the current active oplog is compactable; if so
     {
       Oplog active = oplogSet.getChild();
@@ -2637,7 +2673,7 @@ public class DiskStoreImpl implements DiskStore {
    *
    */
   private void destroyAllOplogs() {
-    persistentOplogs.destroyAllOplogs();
+    getPersistentOplogs().destroyAllOplogs();
 
     // Need to also remove all oplogs that logically belong to this DiskStore
     // even if we were not using them.
@@ -2711,7 +2747,7 @@ public class DiskStoreImpl implements DiskStore {
     if (!all && max > MAX_OPLOGS_PER_COMPACTION && MAX_OPLOGS_PER_COMPACTION > 
0) {
       max = MAX_OPLOGS_PER_COMPACTION;
     }
-    persistentOplogs.getCompactableOplogs(l, max);
+    getPersistentOplogs().getCompactableOplogs(l, max);
 
     // Note this always puts overflow oplogs on the end of the list.
     // They may get starved.
@@ -2728,7 +2764,7 @@ public class DiskStoreImpl implements DiskStore {
    * Get all of the oplogs
    */
   public Oplog[] getAllOplogsForBackup() {
-    return persistentOplogs.getAllOplogs();
+    return getPersistentOplogs().getAllOplogs();
   }
 
   // @todo perhaps a better thing for the tests would be to give them a 
listener
@@ -3923,8 +3959,8 @@ public class DiskStoreImpl implements DiskStore {
       scheduleForRecovery(OfflineCompactionDiskRegion.create(this, drv));
     }
 
-    persistentOplogs.recoverRegionsThatAreReady();
-    persistentOplogs.offlineCompact();
+    getPersistentOplogs().recoverRegionsThatAreReady();
+    getPersistentOplogs().offlineCompact();
 
     // TODO soplogs - we need to do offline compaction for
     // the soplog regions, but that is not currently implemented
@@ -4238,7 +4274,7 @@ public class DiskStoreImpl implements DiskStore {
   }
 
   public boolean hasPersistedData() {
-    return persistentOplogs.getChild() != null;
+    return getPersistentOplogs().getChild() != null;
   }
 
   public UUID getDiskStoreUUID() {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
index 3c82ad3..82694c6 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/DiskStoreImplIntegrationTest.java
@@ -21,7 +21,10 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -32,11 +35,15 @@ import org.junit.rules.TemporaryFolder;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.cache.backup.BackupService;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
+
 @Category(IntegrationTest.class)
 public class DiskStoreImplIntegrationTest {
   private static final String DISK_STORE_NAME = "testDiskStore";
@@ -46,6 +53,8 @@ public class DiskStoreImplIntegrationTest {
   public TemporaryFolder temporaryDirectory = new TemporaryFolder();
 
   private Cache cache;
+  private Region aRegion;
+  private DiskStoreStats diskStoreStats;
 
   @Before
   public void setup() {
@@ -80,12 +89,53 @@ public class DiskStoreImplIntegrationTest {
     tempDirs.forEach(tempDir -> assertThat(Files.exists(tempDir)).isFalse());
   }
 
+  @Test
+  public void queueSizeStatIncrementedAfterAsyncFlush() throws Exception {
+    File baseDir = temporaryDirectory.newFolder();
+    final int QUEUE_SIZE = 50;
+    createRegionWithDiskStoreAndAsyncQueue(baseDir, QUEUE_SIZE);
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
diskStoreStats.getQueueSize() == 0);
+
+    putEntries(QUEUE_SIZE - 1);
+    Awaitility.await().atMost(1, TimeUnit.MINUTES)
+        .until(() -> diskStoreStats.getQueueSize() == QUEUE_SIZE - 1);
+
+    putEntries(1);
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> 
diskStoreStats.getQueueSize() == 0);
+  }
+
+  private void putEntries(int numToPut) {
+    for (int i = 1; i <= numToPut; i++) {
+      aRegion.put(i, i);
+    }
+  }
+
   private void createRegionWithDiskStore(File baseDir) {
     cache.createDiskStoreFactory().setDiskDirs(new File[] 
{baseDir}).create(DISK_STORE_NAME);
     cache.<String, 
String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT)
         .setDiskStoreName(DISK_STORE_NAME).create(REGION_NAME);
   }
 
+  private void createRegionWithDiskStoreAndAsyncQueue(File baseDir, int 
queueSize) {
+    createDiskStoreWithQueue(baseDir, queueSize);
+
+    RegionFactory regionFactory =
+        cache.<String, 
String>createRegionFactory(RegionShortcut.PARTITION_PERSISTENT);
+    regionFactory.setDiskSynchronous(false);
+    regionFactory.setDiskStoreName(DISK_STORE_NAME);
+    aRegion = regionFactory.create(REGION_NAME);
+  }
+
+  private void createDiskStoreWithQueue(File baseDir, int queueSize) {
+    final int TIME_INTERVAL = 300000;
+    DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+    diskStoreFactory.setDiskDirs(new File[] {baseDir});
+    diskStoreFactory.setQueueSize(queueSize);
+    diskStoreFactory.setTimeInterval(TIME_INTERVAL);
+    DiskStore diskStore = diskStoreFactory.create(DISK_STORE_NAME);
+    diskStoreStats = ((DiskStoreImpl) diskStore).getStats();
+  }
+
   private Cache createCache() {
     // Setting MCAST port explicitly is currently required due to default 
properties set in gradle
     return new CacheFactory().set(ConfigurationProperties.MCAST_PORT, 
"0").create();
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
new file mode 100644
index 0000000..7acffae
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/FlusherThreadTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.DiskAccessException;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class FlusherThreadTest {
+
+  private DiskStoreImpl diskStoreImpl;
+  private DiskStoreStats diskStoreStats;
+  private final int DRAIN_LIST_SIZE = 5;
+
+  @Before
+  public void setup() {
+    diskStoreImpl = mock(DiskStoreImpl.class);
+    diskStoreStats = mock(DiskStoreStats.class);
+    PersistentOplogSet persistentOpLogSet = mock(PersistentOplogSet.class);
+
+    when(diskStoreImpl.getAsyncMonitor()).thenReturn(new Object());
+    when(diskStoreImpl.getForceFlushCount()).thenReturn(new AtomicInteger(1));
+    
when(diskStoreImpl.fillDrainList()).thenReturn(DRAIN_LIST_SIZE).thenReturn(0);
+    when(diskStoreImpl.getDrainList()).thenReturn(new ArrayList());
+    when(diskStoreImpl.getPersistentOplogs()).thenReturn(persistentOpLogSet);
+    when(diskStoreImpl.getStats()).thenReturn(diskStoreStats);
+    when(diskStoreImpl.checkAndClearForceFlush()).thenReturn(true);
+    when(diskStoreImpl.isStopFlusher()).thenReturn(false).thenReturn(true);
+  }
+
+  @Test
+  public void asyncFlushIncrementsQueueSizeStat() {
+    DiskStoreImpl.FlusherThread flusherThread = new 
DiskStoreImpl.FlusherThread(diskStoreImpl);
+
+    flusherThread.doAsyncFlush();
+
+    verify(diskStoreStats, times(1)).incQueueSize(-DRAIN_LIST_SIZE);
+  }
+
+  @Test
+  public void asyncFlushDoesNotIncrementQueueSizeWhenExceptionThrown() {
+    DiskStoreImpl.FlusherThread flusherThread = new 
DiskStoreImpl.FlusherThread(diskStoreImpl);
+    when(diskStoreImpl.getDrainList()).thenThrow(DiskAccessException.class);
+
+    flusherThread.doAsyncFlush();
+
+    verify(diskStoreStats, never()).incQueueSize(anyInt());
+  }
+
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
index 16df420..a7f1e27 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogFlushTest.java
@@ -152,7 +152,7 @@ public class OplogFlushTest extends DiskRegionTestingBase {
   public void testAsyncChannelWriteRetriesOnFailureDuringFlush() throws 
Exception {
     region = DiskRegionHelperFactory.getAsyncOverFlowAndPersistRegion(cache, 
null);
     DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs();
     assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
     assertNotNull("Unexpected null Oplog", oplogs[0]);
 
@@ -163,7 +163,7 @@ public class OplogFlushTest extends DiskRegionTestingBase {
   public void testChannelWriteRetriesOnFailureDuringFlush() throws Exception {
     region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
     DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs();
     assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
     assertNotNull("Unexpected null Oplog", oplogs[0]);
 
@@ -174,7 +174,7 @@ public class OplogFlushTest extends DiskRegionTestingBase {
   public void testChannelRecoversFromWriteFailureRepeatedRetriesDuringFlush() 
throws Exception {
     region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
     DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs();
     assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
     assertNotNull("Unexpected null Oplog", oplogs[0]);
 
@@ -188,7 +188,7 @@ public class OplogFlushTest extends DiskRegionTestingBase {
     expectedException.expectCause(instanceOf(IOException.class));
     region = DiskRegionHelperFactory.getSyncOverFlowAndPersistRegion(cache, 
null);
     DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs();
     assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
     assertNotNull("Unexpected null Oplog", oplogs[0]);
 
@@ -225,7 +225,7 @@ public class OplogFlushTest extends DiskRegionTestingBase {
   public void testOplogByteArrayFlush() throws Exception {
     region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, null, 
Scope.LOCAL);
     DiskRegion dr = ((LocalRegion) region).getDiskRegion();
-    Oplog[] oplogs = dr.getDiskStore().persistentOplogs.getAllOplogs();
+    Oplog[] oplogs = dr.getDiskStore().getPersistentOplogs().getAllOplogs();
     assertNotNull("Unexpected null Oplog[] for " + dr.getName(), oplogs);
     assertNotNull("Unexpected null Oplog", oplogs[0]);
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java
index b259277..1b920da 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogJUnitTest.java
@@ -420,7 +420,7 @@ public class OplogJUnitTest extends DiskRegionTestingBase {
     region.put(2, val);
     Oplog switched = dr.testHook_getChild();
     assertTrue(old != switched);
-    assertEquals(dr.getDiskStore().persistentOplogs.getChild(2), switched);
+    assertEquals(dr.getDiskStore().getPersistentOplogs().getChild(2), 
switched);
     assertEquals(oldWriteBuf, switched.getWriteBuf());
     assertEquals(null, old.getWriteBuf());
     closeDown();
@@ -1925,7 +1925,7 @@ public class OplogJUnitTest extends DiskRegionTestingBase 
{
   private long oplogSize() {
     long size = ((LocalRegion) 
region).getDiskRegion().getDiskStore().undeletedOplogSize.get();
     Oplog[] opArray =
-        ((LocalRegion) 
region).getDiskRegion().getDiskStore().persistentOplogs.getAllOplogs();
+        ((LocalRegion) 
region).getDiskRegion().getDiskStore().getPersistentOplogs().getAllOplogs();
     if (opArray != null) {
       for (Oplog log : opArray) {
         size += log.getOplogSize();
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java
index c19a728..d38304a 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/SimpleDiskRegionJUnitTest.java
@@ -245,7 +245,7 @@ public class SimpleDiskRegionJUnitTest extends 
DiskRegionTestingBase {
     StatisticsFactory factory = region.getCache().getDistributedSystem();
     Oplog newOplog =
         new Oplog(id, dr.getOplogSet(), new DirectoryHolder(factory, dirs[0], 
1000000, 0));
-    dr.getDiskStore().persistentOplogs.setChild(newOplog);
+    dr.getDiskStore().getPersistentOplogs().setChild(newOplog);
     assertEquals(newOplog, dr.testHook_getChild());
     dr.setChild(oplog);
     assertEquals(oplog, dr.testHook_getChild());

-- 
To stop receiving notification emails like this one, please contact
lgalli...@apache.org.

Reply via email to