GEODE-2230: Added AsyncEventQueue and GatewaySender waitUntilFlushed API

Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/44cd72d8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/44cd72d8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/44cd72d8

Branch: refs/heads/master
Commit: 44cd72d8502d278a9a328a2b0a825c2460f8a383
Parents: a15daf0
Author: Barry Oglesby <bogle...@pivotal.io>
Authored: Thu Jan 19 17:07:09 2017 -0800
Committer: Barry Oglesby <bogle...@pivotal.io>
Committed: Tue Jan 24 16:52:37 2017 -0800

----------------------------------------------------------------------
 .../internal/AsyncEventQueueImpl.java           |   5 +
 .../org/apache/geode/internal/DSFIDFactory.java |   6 +-
 .../geode/internal/DataSerializableFixedID.java |   2 +
 .../geode/internal/cache/BucketRegionQueue.java |  43 +++
 .../cache/wan/AbstractGatewaySender.java        |  47 ++-
 ...aitUntilGatewaySenderFlushedCoordinator.java |  43 +++
 ...ParallelGatewaySenderFlushedCoordinator.java | 323 +++++++++++++++++++
 .../geode/internal/i18n/LocalizedStrings.java   |   9 +
 ...atewaySenderFlushedCoordinatorJUnitTest.java |  62 ++++
 ...atewaySenderFlushedCoordinatorJUnitTest.java | 126 ++++++++
 .../sanctionedDataSerializables.txt             |   4 +
 .../apache/geode/cache/lucene/LuceneIndex.java  |   9 +-
 .../cache/lucene/internal/LuceneIndexImpl.java  |  24 +-
 .../internal/xml/LuceneIndexCreation.java       |   3 +-
 .../LuceneIndexMaintenanceIntegrationTest.java  |  24 +-
 .../geode/cache/lucene/LuceneQueriesBase.java   |   3 +-
 .../lucene/LuceneQueriesIntegrationTest.java    |  22 +-
 ...LuceneQueriesPersistenceIntegrationTest.java |   3 +-
 .../internal/LuceneIndexImplJUnitTest.java      |   8 +-
 .../LuceneIndexRecoveryHAIntegrationTest.java   |  11 +-
 .../cli/LuceneIndexCommandsDUnitTest.java       |   3 +-
 .../DumpDirectoryFilesIntegrationTest.java      |   3 +-
 .../cache/lucene/test/LuceneTestUtilities.java  |   6 +-
 23 files changed, 722 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index 2eb53be..3b99f1c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -17,6 +17,7 @@ package org.apache.geode.cache.asyncqueue.internal;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
@@ -202,4 +203,8 @@ public class AsyncEventQueueImpl implements AsyncEventQueue 
{
   public boolean isForwardExpirationDestroy() {
     return ((AbstractGatewaySender) this.sender).isForwardExpirationDestroy();
   }
+
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, 
unit);
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java 
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index e13a2ad..bb29239 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -23,7 +23,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.NotSerializableException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
@@ -76,7 +75,6 @@ import 
org.apache.geode.cache.query.internal.types.StructTypeImpl;
 import org.apache.geode.distributed.internal.DistributionAdvisor;
 import org.apache.geode.distributed.internal.HighPriorityAckedMessage;
 import org.apache.geode.distributed.internal.ReplyMessage;
-import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.SerialAckedMessage;
 import org.apache.geode.distributed.internal.ShutdownMessage;
 import org.apache.geode.distributed.internal.StartupMessage;
@@ -393,11 +391,11 @@ import 
org.apache.geode.internal.cache.versions.VMVersionTag;
 import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument;
 import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import 
org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator;
 import 
org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage;
 import 
org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.BatchRemovalReplyMessage;
 import 
org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage;
 import org.apache.geode.internal.cache.wan.serial.BatchDestroyOperation;
-import org.apache.geode.internal.i18n.LocalizedStrings;
 import 
org.apache.geode.management.internal.JmxManagerAdvisor.JmxManagerProfile;
 import 
org.apache.geode.management.internal.JmxManagerAdvisor.JmxManagerProfileMessage;
 import org.apache.geode.management.internal.JmxManagerLocatorRequest;
@@ -924,6 +922,8 @@ public final class DSFIDFactory implements 
DataSerializableFixedID {
     registerDSFID(PR_DESTROY_ON_DATA_STORE_MESSAGE, 
DestroyRegionOnDataStoreMessage.class);
     registerDSFID(SHUTDOWN_ALL_GATEWAYHUBS_REQUEST, 
ShutdownAllGatewayHubsRequest.class);
     registerDSFID(BUCKET_COUNT_LOAD_PROBE, BucketCountLoadProbe.class);
+    registerDSFID(WAIT_UNTIL_GATEWAY_SENDER_FLUSHED_MESSAGE,
+        
WaitUntilParallelGatewaySenderFlushedCoordinator.WaitUntilGatewaySenderFlushedMessage.class);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
 
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index 24b452d..84eb8e9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -810,6 +810,8 @@ public interface DataSerializableFixedID extends 
SerializationVersions {
   public static final short LUCENE_TOP_ENTRIES = 2175;
   public static final short LUCENE_TOP_ENTRIES_COLLECTOR = 2176;
 
+  public static final short WAIT_UNTIL_GATEWAY_SENDER_FLUSHED_MESSAGE = 2177;
+
   // NOTE, codes > 65535 will take 4 bytes to serialize
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 7fea789..4830912 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -30,6 +30,8 @@ import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.logging.log4j.Logger;
@@ -78,6 +80,10 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
 
   private long lastKeyRecovered;
 
+  private AtomicLong latestQueuedKey = new AtomicLong();
+
+  private AtomicLong latestAcknowledgedKey = new AtomicLong();
+
   /**
    * @param regionName
    * @param attrs
@@ -437,6 +443,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     if (didPut) {
       if (this.initialized) {
         this.eventSeqNumQueue.add(key);
+        updateLargestQueuedKey((Long) key);
       }
       if (logger.isDebugEnabled()) {
         logger.debug("Put successfully in the queue : {} was initialized: {}",
@@ -448,6 +455,41 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
     }
   }
 
+  private void updateLargestQueuedKey(Long key) {
+    Atomics.setIfGreater(this.latestQueuedKey, key);
+  }
+
+  private void setLatestAcknowledgedKey(Long key) {
+    this.latestAcknowledgedKey.set(key);
+  }
+
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("BucketRegionQueue: waitUntilFlushed bucket=" + getId() + 
"; time="
+          + System.currentTimeMillis() + "; timeout=" + timeout + "; unit=" + 
unit);
+    }
+    boolean result = false;
+    // Wait until latestAcknowledgedKey > latestQueuedKey or the queue is empty
+    if (this.initialized) {
+      long latestQueuedKeyToCheck = this.latestQueuedKey.get();
+      long nanosRemaining = unit.toNanos(timeout);
+      long endTime = System.nanoTime() + nanosRemaining;
+      while (nanosRemaining > 0) {
+        if (latestAcknowledgedKey.get() > latestQueuedKeyToCheck || isEmpty()) 
{
+          result = true;
+          break;
+        }
+        Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(nanosRemaining) + 
1, 100));
+        nanosRemaining = endTime - System.nanoTime();
+      }
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("BucketRegionQueue: waitUntilFlushed completed bucket=" + 
getId() + "; time="
+          + System.currentTimeMillis() + "; result=" + result);
+    }
+    return result;
+  }
+
   /**
    * It removes the first key from the queue.
    * 
@@ -503,6 +545,7 @@ public class BucketRegionQueue extends 
AbstractBucketRegionQueue {
       event.setEventId(new EventID(cache.getSystem()));
       event.setRegion(this);
       basicDestroy(event, true, null);
+      setLatestAcknowledgedKey((Long) key);
       checkReadiness();
     } catch (EntryNotFoundException enf) {
       if (getPartitionedRegion().isDestroyed()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 2c9a65d..1f8704c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -22,9 +22,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.internal.cache.execute.BucketMovedException;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import 
org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
@@ -33,15 +37,12 @@ import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.RegionExistsException;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
-import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueStats;
 import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.wan.GatewayEventFilter;
@@ -675,7 +676,7 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     }
   }
 
-  final public RegionQueue getQueue() {
+  public RegionQueue getQueue() {
     if (this.eventProcessor != null) {
       if (!(this.eventProcessor instanceof 
ConcurrentSerialGatewaySenderEventProcessor)) {
         return this.eventProcessor.getQueue();
@@ -1089,7 +1090,7 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     return substituteValue;
   }
 
-  private void initializeEventIdIndex() {
+  protected void initializeEventIdIndex() {
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
     boolean gotLock = false;
@@ -1240,6 +1241,42 @@ public abstract class AbstractGatewaySender implements 
GatewaySender, Distributi
     return lifeCycleLock;
   }
 
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
+    int attempts = 0;
+    boolean result = false;
+    if (isParallel()) {
+      // Wait until the sender is flushed. Retry if necessary.
+      while (true) {
+        try {
+          WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
+              new WaitUntilParallelGatewaySenderFlushedCoordinator(this, 
timeout, unit, true);
+          result = coordinator.waitUntilFlushed();
+          break;
+        } catch (BucketMovedException | CancelException | 
RegionDestroyedException e) {
+          attempts++;
+          logger.warn(
+              
LocalizedStrings.AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETRYING
+                  .toLocalizedString(),
+              e);
+          Thread.sleep(100);
+        } catch (Throwable t) {
+          attempts++;
+          logger.warn(
+              
LocalizedStrings.AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETURNING
+                  .toLocalizedString(),
+              t);
+          throw new InternalGemFireError(t);
+        }
+      }
+      return result;
+    } else {
+      // Serial senders are currently not supported
+      throw new UnsupportedOperationException(
+          
LocalizedStrings.AbstractGatewaySender_WAIT_UNTIL_FLUSHED_NOT_SUPPORTED_FOR_SERIAL_SENDERS
+              .toLocalizedString());
+    }
+  }
+
   /**
    * Has a reference to a GatewayEventImpl and has a timeout value.
    */

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java
new file mode 100644
index 0000000..07c4e19
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+
+public abstract class WaitUntilGatewaySenderFlushedCoordinator {
+
+  protected AbstractGatewaySender sender;
+
+  protected long timeout;
+
+  protected TimeUnit unit;
+
+  protected boolean initiator;
+
+  protected static final Logger logger = LogService.getLogger();
+
+  public WaitUntilGatewaySenderFlushedCoordinator(AbstractGatewaySender 
sender, long timeout,
+      TimeUnit unit, boolean initiator) {
+    this.sender = sender;
+    this.timeout = timeout;
+    this.unit = unit;
+    this.initiator = initiator;
+  }
+
+  public abstract boolean waitUntilFlushed() throws Throwable;
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
new file mode 100644
index 0000000..a4c03a9
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.*;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import 
org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+public class WaitUntilParallelGatewaySenderFlushedCoordinator
+    extends WaitUntilGatewaySenderFlushedCoordinator {
+
+  public 
WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender,
+      long timeout, TimeUnit unit, boolean initiator) {
+    super(sender, timeout, unit, initiator);
+  }
+
+  public boolean waitUntilFlushed() throws Throwable {
+    boolean remoteResult = true, localResult = true;
+    Throwable exceptionToThrow = null;
+    ConcurrentParallelGatewaySenderQueue prq =
+        (ConcurrentParallelGatewaySenderQueue) this.sender.getQueue();
+    PartitionedRegion pr = (PartitionedRegion) prq.getRegion();
+
+    // Create callables for local buckets
+    List<WaitUntilBucketRegionQueueFlushedCallable> callables =
+        buildWaitUntilBucketRegionQueueFlushedCallables(pr);
+
+    // Submit local callables for execution
+    ExecutorService service = 
this.sender.getDistributionManager().getWaitingThreadPool();
+    List<Future<Boolean>> callableFutures = new ArrayList<>();
+    for (Callable<Boolean> callable : callables) {
+      callableFutures.add(service.submit(callable));
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created 
and submitted "
+          + callables.size() + " callables=" + callables);
+    }
+
+    // Send message to remote buckets
+    if (this.initiator) {
+      remoteResult = false;
+      try {
+        remoteResult = waitUntilFlushedOnRemoteMembers(pr);
+      } catch (Throwable t) {
+        exceptionToThrow = t;
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Processed remote result="
+            + remoteResult + "; exceptionToThrow=" + exceptionToThrow);
+      }
+    }
+
+    // Process local future results
+    for (Future<Boolean> future : callableFutures) {
+      boolean singleBucketResult = false;
+      try {
+        singleBucketResult = future.get();
+      } catch (ExecutionException e) {
+        exceptionToThrow = e.getCause();
+      }
+      localResult = localResult && singleBucketResult;
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Processed local result="
+          + localResult + "; exceptionToThrow=" + exceptionToThrow);
+    }
+
+    // Return the full result
+    if (exceptionToThrow == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Returning full result="
+            + (remoteResult && localResult));
+      }
+      return remoteResult && localResult;
+    } else {
+      throw exceptionToThrow;
+    }
+  }
+
+  protected List<WaitUntilBucketRegionQueueFlushedCallable> 
buildWaitUntilBucketRegionQueueFlushedCallables(
+      PartitionedRegion pr) {
+    List<WaitUntilBucketRegionQueueFlushedCallable> callables = new 
ArrayList<>();
+    if (pr.isDataStore()) {
+      for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) {
+        callables.add(new 
WaitUntilBucketRegionQueueFlushedCallable((BucketRegionQueue) br,
+            this.timeout, this.unit));
+      }
+    }
+    return callables;
+  }
+
+  protected boolean waitUntilFlushedOnRemoteMembers(PartitionedRegion pr) 
throws Throwable {
+    boolean result = true;
+    DM dm = this.sender.getDistributionManager();
+    Set<InternalDistributedMember> recipients = 
pr.getRegionAdvisor().adviseDataStore();
+    if (!recipients.isEmpty()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "WaitUntilParallelGatewaySenderFlushedCoordinator: About to send 
message recipients="
+                + recipients);
+      }
+      WaitUntilGatewaySenderFlushedReplyProcessor processor =
+          new WaitUntilGatewaySenderFlushedReplyProcessor(dm, recipients);
+      WaitUntilGatewaySenderFlushedMessage message = new 
WaitUntilGatewaySenderFlushedMessage(
+          recipients, processor.getProcessorId(), this.sender.getId(), 
this.timeout, this.unit);
+      dm.putOutgoing(message);
+      if (logger.isDebugEnabled()) {
+        logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Sent 
message recipients="
+            + recipients);
+      }
+      try {
+        processor.waitForReplies();
+        result = processor.getCombinedResult();
+      } catch (ReplyException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: 
Caught e=" + e
+              + "; cause=" + e.getCause());
+        }
+        throw e.getCause();
+      } catch (InterruptedException e) {
+        dm.getCancelCriterion().checkCancelInProgress(e);
+        Thread.currentThread().interrupt();
+        result = false;
+      }
+    }
+    return result;
+  }
+
+  public static class WaitUntilBucketRegionQueueFlushedCallable implements 
Callable<Boolean> {
+
+    private BucketRegionQueue brq;
+
+    private long timeout;
+
+    private TimeUnit unit;
+
+    public WaitUntilBucketRegionQueueFlushedCallable(BucketRegionQueue brq, 
long timeout,
+        TimeUnit unit) {
+      this.brq = brq;
+      this.timeout = timeout;
+      this.unit = unit;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      return this.brq.waitUntilFlushed(this.timeout, this.unit);
+    }
+
+    @Override
+    public String toString() {
+      return new 
StringBuilder().append(getClass().getSimpleName()).append("[").append("brq=")
+          .append(this.brq.getId()).append("]").toString();
+    }
+  }
+
+  public static class WaitUntilGatewaySenderFlushedReplyProcessor extends 
ReplyProcessor21 {
+
+    private Map<DistributedMember, Boolean> responses;
+
+    public WaitUntilGatewaySenderFlushedReplyProcessor(DM dm, Collection 
initMembers) {
+      super(dm, initMembers);
+      initializeResponses();
+    }
+
+    private void initializeResponses() {
+      this.responses = new ConcurrentHashMap<>();
+      for (InternalDistributedMember member : getMembers()) {
+        this.responses.put(member, false);
+      }
+    }
+
+    @Override
+    public void process(DistributionMessage msg) {
+      try {
+        if (msg instanceof ReplyMessage) {
+          ReplyMessage reply = (ReplyMessage) msg;
+          if (logger.isDebugEnabled()) {
+            logger
+                .debug("WaitUntilGatewaySenderFlushedReplyProcessor: 
Processing reply from sender="
+                    + reply.getSender() + "; returnValue=" + 
reply.getReturnValue() + "; exception="
+                    + reply.getException());
+          }
+          if (reply.getException() == null) {
+            this.responses.put(reply.getSender(), (Boolean) 
reply.getReturnValue());
+          } else {
+            reply.getException().printStackTrace();
+          }
+        }
+      } finally {
+        super.process(msg);
+      }
+    }
+
+    public boolean getCombinedResult() {
+      boolean combinedResult = true;
+      for (boolean singleMemberResult : this.responses.values()) {
+        combinedResult = combinedResult && singleMemberResult;
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("WaitUntilGatewaySenderFlushedReplyProcessor: Returning 
combinedResult="
+            + combinedResult);
+      }
+      return combinedResult;
+    }
+  }
+
+  public static class WaitUntilGatewaySenderFlushedMessage extends 
PooledDistributionMessage
+      implements MessageWithReply {
+
+    private int processorId;
+
+    private String gatewaySenderId;
+
+    private long timeout;
+
+    private TimeUnit unit;
+
+    /* For serialization */
+    public WaitUntilGatewaySenderFlushedMessage() {}
+
+    protected WaitUntilGatewaySenderFlushedMessage(Collection recipients, int 
processorId,
+        String gatewaySenderId, long timeout, TimeUnit unit) {
+      super();
+      setRecipients(recipients);
+      this.processorId = processorId;
+      this.gatewaySenderId = gatewaySenderId;
+      this.timeout = timeout;
+      this.unit = unit;
+    }
+
+    @Override
+    protected void process(DistributionManager dm) {
+      boolean result = false;
+      ReplyException replyException = null;
+      try {
+        if (logger.isDebugEnabled()) {
+          logger.debug("WaitUntilGatewaySenderFlushedMessage: Processing 
gatewaySenderId="
+              + this.gatewaySenderId + "; timeout=" + this.timeout + "; unit=" 
+ this.unit);
+        }
+        Cache cache = GemFireCacheImpl.getInstance();
+        if (cache != null) {
+          AbstractGatewaySender sender =
+              (AbstractGatewaySender) 
cache.getGatewaySender(this.gatewaySenderId);
+          if (sender != null) {
+            try {
+              WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
+                  new WaitUntilParallelGatewaySenderFlushedCoordinator(sender, 
this.timeout,
+                      this.unit, false);
+              result = coordinator.waitUntilFlushed();
+            } catch (Throwable e) {
+              replyException = new ReplyException(e);
+            }
+          }
+        }
+      } finally {
+        ReplyMessage replyMsg = new ReplyMessage();
+        replyMsg.setRecipient(getSender());
+        replyMsg.setProcessorId(this.processorId);
+        if (replyException == null) {
+          replyMsg.setReturnValue(result);
+        } else {
+          replyMsg.setException(replyException);
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("WaitUntilGatewaySenderFlushedMessage: Sending reply 
returnValue="
+              + replyMsg.getReturnValue() + "; exception=" + 
replyMsg.getException());
+        }
+        dm.putOutgoing(replyMsg);
+      }
+    }
+
+    @Override
+    public int getDSFID() {
+      return WAIT_UNTIL_GATEWAY_SENDER_FLUSHED_MESSAGE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      out.writeInt(this.processorId);
+      DataSerializer.writeString(this.gatewaySenderId, out);
+      out.writeLong(this.timeout);
+      DataSerializer.writeEnum(this.unit, out);
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      super.fromData(in);
+      this.processorId = in.readInt();
+      this.gatewaySenderId = DataSerializer.readString(in);
+      this.timeout = in.readLong();
+      this.unit = DataSerializer.readEnum(TimeUnit.class, in);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java 
b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
index c616031..1c214e4 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java
@@ -7665,6 +7665,15 @@ public class LocalizedStrings {
       new StringId(6647,
           "Cannot create GatewaySender {0} because the maximum ({1}) has been 
reached");
 
+  public static final StringId 
AbstractGatewaySender_WAIT_UNTIL_FLUSHED_NOT_SUPPORTED_FOR_SERIAL_SENDERS =
+      new StringId(6648, "waitUntilFlushed is not currently supported for 
serial gateway senders");
+  public static final StringId 
AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETRYING =
+      new StringId(6649,
+          "Caught the following exception attempting waitUntilFlushed and will 
retry:");
+  public static final StringId 
AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETURNING =
+      new StringId(6650,
+          "Caught the following exception attempting waitUntilFlushed and will 
return:");
+
   /** Testing strings, messageId 90000-99999 **/
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java
new file mode 100644
index 0000000..17696bb
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.junit.After;
+import org.junit.Before;
+
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.mockito.Mockito.*;
+
+public abstract class WaitUntilGatewaySenderFlushedCoordinatorJUnitTest {
+
+  protected GemFireCacheImpl cache;
+
+  protected AbstractGatewaySender sender;
+
+  @Before
+  public void setUp() {
+    createCache();
+    createGatewaySender();
+  }
+
+  @After
+  public void tearDown() {
+    if (this.cache != null) {
+      this.cache.close();
+    }
+  }
+
+  private void createCache() {
+    this.cache = (GemFireCacheImpl) new CacheFactory().set(MCAST_PORT, "0")
+        .set(LOG_LEVEL, "warning").create();
+  }
+
+  protected void createGatewaySender() {
+    this.sender = spy(AbstractGatewaySender.class);
+    this.sender.cache = this.cache;
+    this.sender.eventProcessor = getEventProcessor();
+  }
+
+  protected RegionQueue getQueue() {
+    return this.sender.eventProcessor.getQueue();
+  }
+
+  protected abstract AbstractGatewaySenderEventProcessor getEventProcessor();
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
new file mode 100644
index 0000000..c8b8ba1
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinatorJUnitTest;
+import 
org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator.WaitUntilBucketRegionQueueFlushedCallable;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@Category(IntegrationTest.class)
+public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest
+    extends WaitUntilGatewaySenderFlushedCoordinatorJUnitTest {
+
+  private PartitionedRegion region;
+
+  protected void createGatewaySender() {
+    super.createGatewaySender();
+    ConcurrentParallelGatewaySenderQueue queue =
+        (ConcurrentParallelGatewaySenderQueue) spy(getQueue());
+    doReturn(queue).when(this.sender).getQueue();
+    this.region = mock(PartitionedRegion.class);
+    doReturn(this.region).when(queue).getRegion();
+  }
+
+  protected AbstractGatewaySenderEventProcessor getEventProcessor() {
+    ConcurrentParallelGatewaySenderEventProcessor processor =
+        spy(new ConcurrentParallelGatewaySenderEventProcessor(this.sender));
+    return processor;
+  }
+
+  @Test
+  public void 
testWaitUntilParallelGatewaySenderFlushedSuccessfulNotInitiator() throws 
Throwable {
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
+            TimeUnit.MILLISECONDS, false);
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
+    doReturn(getSuccessfulCallables()).when(coordinatorSpy)
+        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    boolean result = coordinatorSpy.waitUntilFlushed();
+    assertTrue(result);
+  }
+
+  @Test
+  public void 
testWaitUntilParallelGatewaySenderFlushedUnsuccessfulNotInitiator() throws 
Throwable {
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
+            TimeUnit.MILLISECONDS, false);
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
+    doReturn(getUnsuccessfulCallables()).when(coordinatorSpy)
+        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    boolean result = coordinatorSpy.waitUntilFlushed();
+    assertFalse(result);
+  }
+
+  @Test
+  public void testWaitUntilParallelGatewaySenderFlushedSuccessfulInitiator() 
throws Throwable {
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
+            TimeUnit.MILLISECONDS, true);
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
+    doReturn(getSuccessfulCallables()).when(coordinatorSpy)
+        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    
doReturn(true).when(coordinatorSpy).waitUntilFlushedOnRemoteMembers(this.region);
+    boolean result = coordinatorSpy.waitUntilFlushed();
+    assertTrue(result);
+  }
+
+  @Test
+  public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulInitiator() 
throws Throwable {
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinator =
+        new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 
1000l,
+            TimeUnit.MILLISECONDS, true);
+    WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = 
spy(coordinator);
+    doReturn(getSuccessfulCallables()).when(coordinatorSpy)
+        .buildWaitUntilBucketRegionQueueFlushedCallables(this.region);
+    
doReturn(false).when(coordinatorSpy).waitUntilFlushedOnRemoteMembers(this.region);
+    boolean result = coordinatorSpy.waitUntilFlushed();
+    assertFalse(result);
+  }
+
+  private List<WaitUntilBucketRegionQueueFlushedCallable> 
getSuccessfulCallables()
+      throws Exception {
+    List callables = new ArrayList();
+    WaitUntilBucketRegionQueueFlushedCallable callable =
+        mock(WaitUntilBucketRegionQueueFlushedCallable.class);
+    when(callable.call()).thenReturn(true);
+    callables.add(callable);
+    return callables;
+  }
+
+  private List<WaitUntilBucketRegionQueueFlushedCallable> 
getUnsuccessfulCallables()
+      throws Exception {
+    List callables = new ArrayList();
+    WaitUntilBucketRegionQueueFlushedCallable callable =
+        mock(WaitUntilBucketRegionQueueFlushedCallable.class);
+    when(callable.call()).thenReturn(false);
+    callables.add(callable);
+    return callables;
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 3f90ca4..0a791c4 100644
--- 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -2134,3 +2134,7 @@ org/apache/geode/redis/internal/DoubleWrapper,2
 fromData,9,2a2bb80004b50002b1
 toData,9,2ab400022bb80003b1
 
+org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator$WaitUntilGatewaySenderFlushedMessage,2
+fromData,47,2a2bb700322a2bb900330100b500032a2bb80034b500042a2bb900350100b500052a12362bb80037c00036b50006b1
+toData,42,2a2bb7002d2b2ab40003b9002e02002ab400042bb8002f2b2ab40005b9003003002ab400062bb80031b1
+

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java
index f423c74..802b21a 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java
@@ -16,6 +16,7 @@
 package org.apache.geode.cache.lucene;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.analysis.Analyzer;
 
@@ -57,10 +58,12 @@ public interface LuceneIndex {
   /*
    * wait until the current entries in cache are indexed
    * 
-   * @param maxWaitInMilliseconds max wait time in millisecond
+   * @param timeout max wait time
    * 
-   * @return if entries are flushed within maxWait
+   * @param unit granularity of the timeout
+   *
+   * @return if entries are flushed within timeout
    */
-  public boolean waitUntilFlushed(int maxWaitInMillisecond);
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException;
 
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
index 03d85c3..0b6685d 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java
@@ -30,9 +30,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
-import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
-import org.apache.geode.cache.lucene.internal.filesystem.File;
-import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
 import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -92,29 +90,15 @@ public abstract class LuceneIndexImpl implements 
InternalLuceneIndex {
   }
 
   @Override
-  public boolean waitUntilFlushed(int maxWaitInMillisecond) {
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws 
InterruptedException {
     String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
-    AsyncEventQueue queue = (AsyncEventQueue) cache.getAsyncEventQueue(aeqId);
-    boolean flushed = false;
+    AsyncEventQueueImpl queue = (AsyncEventQueueImpl) 
cache.getAsyncEventQueue(aeqId);
     if (queue != null) {
-      long start = System.nanoTime();
-      while (System.nanoTime() - start < 
TimeUnit.MILLISECONDS.toNanos(maxWaitInMillisecond)) {
-        if (0 == queue.size()) {
-          flushed = true;
-          break;
-        } else {
-          try {
-            Thread.sleep(200);
-          } catch (InterruptedException e) {
-          }
-        }
-      }
+      return queue.waitUntilFlushed(timeout, unit);
     } else {
       throw new IllegalArgumentException(
           "The AEQ does not exist for the index " + indexName + " region " + 
regionPath);
     }
-
-    return flushed;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java
index 67b7392..030dddd 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java
@@ -16,6 +16,7 @@
 package org.apache.geode.cache.lucene.internal.xml;
 
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
@@ -101,7 +102,7 @@ public class LuceneIndexCreation implements LuceneIndex, 
Extension<Region<?, ?>>
   }
 
   @Override
-  public boolean waitUntilFlushed(int maxWaitInMillisecond) {
+  public boolean waitUntilFlushed(long timeout, TimeUnit unit) {
     return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
index 1feba06..644ed78 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
@@ -52,7 +52,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     region.put("object-4", new TestObject("hello world", "hello world"));
 
     LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
     LuceneQuery query = 
luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME,
         "description:\"hello world\"", DEFAULT_FIELD);
     PageableLuceneQueryResults<Integer, TestObject> results = 
query.findPages();
@@ -61,7 +61,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     // begin transaction
     cache.getCacheTransactionManager().begin();
     region.put("object-1", new TestObject("title 1", "updated"));
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
     assertEquals(3, query.findPages().size());
   }
 
@@ -76,7 +76,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     region.put("object-4", new TestObject("hello world", "hello world"));
 
     LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
     LuceneQuery query = 
luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME,
         "description:\"hello world\"", DEFAULT_FIELD);
     PageableLuceneQueryResults<Integer, TestObject> results = 
query.findPages();
@@ -85,7 +85,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     cache.getCacheTransactionManager().begin();
     region.put("object-1", new TestObject("title 1", "updated"));
     cache.getCacheTransactionManager().commit();
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
 
     assertEquals(2, query.findPages().size());
   }
@@ -101,7 +101,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     region.put("object-4", new TestObject("hello world", "hello world"));
 
     LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
     LuceneQuery query = 
luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME,
         "description:\"hello world\"", DEFAULT_FIELD);
     PageableLuceneQueryResults<Integer, TestObject> results = 
query.findPages();
@@ -110,7 +110,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     cache.getCacheTransactionManager().begin();
     region.put("object-1", new TestObject("title 1", "updated"));
     cache.getCacheTransactionManager().rollback();
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
 
     assertEquals(3, query.findPages().size());
   }
@@ -127,7 +127,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
 
     LuceneIndexForPartitionedRegion index =
         (LuceneIndexForPartitionedRegion) luceneService.getIndex(INDEX_NAME, 
REGION_NAME);
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
 
     FileSystemStats fileSystemStats = index.getFileSystemStats();
     LuceneIndexStats indexStats = index.getIndexStats();
@@ -154,7 +154,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
 
     LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
     // Wait for events to be flushed from AEQ.
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
     // Execute query to fetch all the values for "description" field.
     LuceneQuery query = 
luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME,
         "description:\"hello world\"", DEFAULT_FIELD);
@@ -173,7 +173,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     region.put(113, new TestObject("hello world", "hello world"));
     LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
     // Wait for events to be flushed from AEQ.
-    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME);
+    index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS);
     // Execute query to fetch all the values for "description" field.
     LuceneQuery query = 
luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME,
         "description:\"hello world\"", DEFAULT_FIELD);
@@ -182,7 +182,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
   }
 
   @Test
-  public void entriesFlushedToIndexAfterWaitForFlushCalled() {
+  public void entriesFlushedToIndexAfterWaitForFlushCalled() throws 
InterruptedException {
     luceneService.createIndex(INDEX_NAME, REGION_NAME, "title", "description");
 
     Region region = createRegion(REGION_NAME, RegionShortcut.PARTITION);
@@ -194,9 +194,9 @@ public class LuceneIndexMaintenanceIntegrationTest extends 
LuceneIntegrationTest
     region.put("object-4", new TestObject("hello world", "hello world"));
 
     LuceneIndexImpl index = (LuceneIndexImpl) 
luceneService.getIndex(INDEX_NAME, REGION_NAME);
-    assertFalse(index.waitUntilFlushed(500));
+    assertFalse(index.waitUntilFlushed(500, TimeUnit.MILLISECONDS));
     LuceneTestUtilities.resumeSender(cache);
-    assertTrue(index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME));
+    assertTrue(index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, 
TimeUnit.MILLISECONDS));
     assertEquals(4, index.getIndexStats().getCommits());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java
index 7c11d7c..a56fff7 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.geode.cache.Cache;
@@ -115,7 +116,7 @@ public abstract class LuceneQueriesBase extends 
LuceneDUnitTest {
       LuceneService service = LuceneServiceProvider.get(cache);
       LuceneIndexImpl index = (LuceneIndexImpl) service.getIndex(INDEX_NAME, 
REGION_NAME);
 
-      return index.waitUntilFlushed(ms);
+      return index.waitUntilFlushed(ms, TimeUnit.MILLISECONDS);
     });
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
index 6cbb1fc..d11ea91 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.lucene.analysis.Analyzer;
@@ -89,7 +90,7 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     // <field1:one two three> <field2:one two three>
     // <field1:one@three> <field2:one@three>
 
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     // standard analyzer with double quote
     // this query string will be parsed as "one three"
@@ -139,7 +140,7 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     region.put("primitiveInt2", 223);
     region.put("primitiveInt3", 224);
 
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
     verifyQueryUsingCustomizedProvider(LuceneService.REGION_VALUE_FIELD, 123, 
223, "primitiveInt1",
         "primitiveInt2");
   }
@@ -158,14 +159,14 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     region.put("primitiveInt2", 223);
     region.put("primitiveInt3", 224);
 
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     // Note: current QueryParser cannot query by range. It's a known issue in 
lucene
     verifyQuery(LuceneService.REGION_VALUE_FIELD + ":[123 TO 223]",
         LuceneService.REGION_VALUE_FIELD);
 
     region.put("primitiveDouble1", 123.0);
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     thrown.expectMessage("java.lang.IllegalArgumentException");
     verifyQueryUsingCustomizedProvider(LuceneService.REGION_VALUE_FIELD, 123, 
223, "primitiveInt1",
@@ -202,7 +203,8 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     assertEquals(region.values(), new HashSet(query.findValues()));
   }
 
-  private LuceneQuery<Object, Object> addValuesAndCreateQuery(int pagesize) {
+  private LuceneQuery<Object, Object> addValuesAndCreateQuery(int pagesize)
+      throws InterruptedException {
     luceneService.createIndex(INDEX_NAME, REGION_NAME, "field1", "field2");
     region = 
cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);
     final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
@@ -219,7 +221,7 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     region.put("F", new TestObject(value3, value3));
     region.put("G", new TestObject(value1, value2));
 
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
     return 
luceneService.createLuceneQueryFactory().setPageSize(pagesize).create(INDEX_NAME,
         REGION_NAME, "one", "field1");
   }
@@ -244,7 +246,7 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     region.put("C", new TestObject(value3, value3));
     region.put("D", new TestObject(value4, value4));
 
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     verifyQuery("field1:one AND field2:two_four", DEFAULT_FIELD, "A");
     verifyQuery("field1:one AND field2:two", DEFAULT_FIELD, "A");
@@ -263,7 +265,7 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     // Put two values with some of the same tokens
     String value1 = "one three";
     region.put("A", new TestObject(value1, null));
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     verifyQuery("field1:one", DEFAULT_FIELD, "A");
   }
@@ -282,7 +284,7 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     PdxInstance pdx1 = insertAJson(region, "jsondoc1");
     PdxInstance pdx2 = insertAJson(region, "jsondoc2");
     PdxInstance pdx10 = insertAJson(region, "jsondoc10");
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     HashMap expectedResults = new HashMap();
     expectedResults.put("jsondoc1", pdx1);
@@ -297,7 +299,7 @@ public class LuceneQueriesIntegrationTest extends 
LuceneIntegrationTest {
     final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
 
     region.put("A", "one three");
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     verifyQuery("one", LuceneService.REGION_VALUE_FIELD, "A");
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java
index c83763f..5ec09f1 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java
@@ -18,6 +18,7 @@ import static 
org.apache.geode.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAlgorithm;
@@ -83,7 +84,7 @@ public class LuceneQueriesPersistenceIntegrationTest extends 
LuceneIntegrationTe
     value = new Type1("lucene world", 1, 2L, 3.0, 4.0f);
     userRegion.put("value3", value);
 
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     PartitionedRegion fileRegion = (PartitionedRegion) cache.getRegion(aeqId + 
".files");
     assertNotNull(fileRegion);

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java
index 17fbcfc..aaa6dbd 100755
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.cache.lucene.internal;
 
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -21,6 +22,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import static org.mockito.Mockito.*;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.geode.cache.Cache;
@@ -49,13 +51,13 @@ public class LuceneIndexImplJUnitTest {
   @Test
   public void waitUnitFlushedWithMissingAEQThrowsIllegalArgument() throws 
Exception {
     thrown.expect(IllegalArgumentException.class);
-    index.waitUntilFlushed(MAX_WAIT);
+    index.waitUntilFlushed(MAX_WAIT, TimeUnit.MILLISECONDS);
   }
 
   @Test
   public void waitUnitFlushedWaitsForFlush() throws Exception {
     final String expectedIndexName = 
LuceneServiceImpl.getUniqueIndexName(INDEX, REGION);
-    final AsyncEventQueue queue = mock(AsyncEventQueue.class);
+    final AsyncEventQueueImpl queue = mock(AsyncEventQueueImpl.class);
     when(cache.getAsyncEventQueue(eq(expectedIndexName))).thenReturn(queue);
 
     AtomicInteger callCount = new AtomicInteger();
@@ -69,7 +71,7 @@ public class LuceneIndexImplJUnitTest {
         return 0;
       }
     });
-    index.waitUntilFlushed(MAX_WAIT);
+    index.waitUntilFlushed(MAX_WAIT, TimeUnit.MILLISECONDS);
     verify(cache).getAsyncEventQueue(eq(expectedIndexName));
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
index 10127ac..d194081 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.junit.Assert.assertNotNull;
@@ -73,7 +74,8 @@ public class LuceneIndexRecoveryHAIntegrationTest {
    * construct index. This test simulates the same.
    */
   // @Test
-  public void recoverRepoInANewNode() throws BucketNotFoundException, 
IOException {
+  public void recoverRepoInANewNode()
+      throws BucketNotFoundException, IOException, InterruptedException {
     LuceneServiceImpl service = (LuceneServiceImpl) 
LuceneServiceProvider.get(cache);
     service.createIndex("index1", "/userRegion", indexedFields);
     PartitionAttributes<String, String> attrs =
@@ -87,7 +89,7 @@ public class LuceneIndexRecoveryHAIntegrationTest {
         (LuceneIndexForPartitionedRegion) service.getIndex("index1", 
"/userRegion");
     // put an entry to create the bucket
     userRegion.put("rebalance", "test");
-    index.waitUntilFlushed(30000);
+    index.waitUntilFlushed(30000, TimeUnit.MILLISECONDS);
 
     RepositoryManager manager = new 
PartitionedRepositoryManager((LuceneIndexImpl) index, mapper);
     IndexRepository repo = manager.getRepository(userRegion, 0, null);
@@ -112,9 +114,10 @@ public class LuceneIndexRecoveryHAIntegrationTest {
 
 
 
-  private void verifyIndexFinishFlushing(String indexName, String regionName) {
+  private void verifyIndexFinishFlushing(String indexName, String regionName)
+      throws InterruptedException {
     LuceneIndex index = LuceneServiceProvider.get(cache).getIndex(indexName, 
regionName);
-    boolean flushed = index.waitUntilFlushed(60000);
+    boolean flushed = index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
     assertTrue(flushed);
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java
index f0f1579..1c8bc88 100755
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java
@@ -610,7 +610,8 @@ public class LuceneIndexCommandsDUnitTest extends 
CliCommandTestBase {
       LuceneService luceneService = LuceneServiceProvider.get(getCache());
       Region region = getCache().getRegion(REGION_NAME);
       region.putAll(entries);
-      luceneService.getIndex(INDEX_NAME, REGION_NAME).waitUntilFlushed(60000);
+      luceneService.getIndex(INDEX_NAME, REGION_NAME).waitUntilFlushed(60000,
+          TimeUnit.MILLISECONDS);
       LuceneIndexImpl index = (LuceneIndexImpl) 
luceneService.getIndex(INDEX_NAME, REGION_NAME);
       Awaitility.await().atMost(65, TimeUnit.SECONDS)
           .until(() -> assertEquals(countOfDocuments, 
index.getIndexStats().getDocuments()));

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java
index 697166e..5c47e13 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java
@@ -18,6 +18,7 @@ import static 
org.apache.geode.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 
 import java.io.File;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
@@ -55,7 +56,7 @@ public class DumpDirectoryFilesIntegrationTest extends 
LuceneIntegrationTest {
     InternalLuceneIndex index =
         (InternalLuceneIndex) luceneService.getIndex(INDEX_NAME, REGION_NAME);
 
-    index.waitUntilFlushed(60000);
+    index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
 
     index.dumpFiles(diskDirRule.get().getAbsolutePath());
 

http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
----------------------------------------------------------------------
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
index d8816a3..21c0bbc 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.geode.cache.Cache;
@@ -86,10 +87,11 @@ public class LuceneTestUtilities {
     LuceneServiceProvider.get(cache).createIndex(INDEX_NAME, REGION_NAME, 
fieldNames);
   }
 
-  public static void verifyIndexFinishFlushing(Cache cache, String indexName, 
String regionName) {
+  public static void verifyIndexFinishFlushing(Cache cache, String indexName, 
String regionName)
+      throws InterruptedException {
     LuceneService luceneService = LuceneServiceProvider.get(cache);
     LuceneIndex index = luceneService.getIndex(indexName, regionName);
-    boolean flushed = index.waitUntilFlushed(60000);
+    boolean flushed = index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS);
     assertTrue(flushed);
   }
 

Reply via email to