GEODE-2230: Added AsyncEventQueue and GatewaySender waitUntilFlushed API
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/44cd72d8 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/44cd72d8 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/44cd72d8 Branch: refs/heads/master Commit: 44cd72d8502d278a9a328a2b0a825c2460f8a383 Parents: a15daf0 Author: Barry Oglesby <bogle...@pivotal.io> Authored: Thu Jan 19 17:07:09 2017 -0800 Committer: Barry Oglesby <bogle...@pivotal.io> Committed: Tue Jan 24 16:52:37 2017 -0800 ---------------------------------------------------------------------- .../internal/AsyncEventQueueImpl.java | 5 + .../org/apache/geode/internal/DSFIDFactory.java | 6 +- .../geode/internal/DataSerializableFixedID.java | 2 + .../geode/internal/cache/BucketRegionQueue.java | 43 +++ .../cache/wan/AbstractGatewaySender.java | 47 ++- ...aitUntilGatewaySenderFlushedCoordinator.java | 43 +++ ...ParallelGatewaySenderFlushedCoordinator.java | 323 +++++++++++++++++++ .../geode/internal/i18n/LocalizedStrings.java | 9 + ...atewaySenderFlushedCoordinatorJUnitTest.java | 62 ++++ ...atewaySenderFlushedCoordinatorJUnitTest.java | 126 ++++++++ .../sanctionedDataSerializables.txt | 4 + .../apache/geode/cache/lucene/LuceneIndex.java | 9 +- .../cache/lucene/internal/LuceneIndexImpl.java | 24 +- .../internal/xml/LuceneIndexCreation.java | 3 +- .../LuceneIndexMaintenanceIntegrationTest.java | 24 +- .../geode/cache/lucene/LuceneQueriesBase.java | 3 +- .../lucene/LuceneQueriesIntegrationTest.java | 22 +- ...LuceneQueriesPersistenceIntegrationTest.java | 3 +- .../internal/LuceneIndexImplJUnitTest.java | 8 +- .../LuceneIndexRecoveryHAIntegrationTest.java | 11 +- .../cli/LuceneIndexCommandsDUnitTest.java | 3 +- .../DumpDirectoryFilesIntegrationTest.java | 3 +- .../cache/lucene/test/LuceneTestUtilities.java | 6 +- 23 files changed, 722 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java index 2eb53be..3b99f1c 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java @@ -17,6 +17,7 @@ package org.apache.geode.cache.asyncqueue.internal; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; @@ -202,4 +203,8 @@ public class AsyncEventQueueImpl implements AsyncEventQueue { public boolean isForwardExpirationDestroy() { return ((AbstractGatewaySender) this.sender).isForwardExpirationDestroy(); } + + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { + return ((AbstractGatewaySender) this.sender).waitUntilFlushed(timeout, unit); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index e13a2ad..bb29239 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -23,7 +23,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.NotSerializableException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -76,7 +75,6 @@ import org.apache.geode.cache.query.internal.types.StructTypeImpl; import org.apache.geode.distributed.internal.DistributionAdvisor; import org.apache.geode.distributed.internal.HighPriorityAckedMessage; import org.apache.geode.distributed.internal.ReplyMessage; -import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.distributed.internal.SerialAckedMessage; import org.apache.geode.distributed.internal.ShutdownMessage; import org.apache.geode.distributed.internal.StartupMessage; @@ -393,11 +391,11 @@ import org.apache.geode.internal.cache.versions.VMVersionTag; import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor; import org.apache.geode.internal.cache.wan.GatewaySenderEventCallbackArgument; import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; +import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator; import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage; import org.apache.geode.internal.cache.wan.parallel.ParallelQueueBatchRemovalMessage.BatchRemovalReplyMessage; import org.apache.geode.internal.cache.wan.parallel.ParallelQueueRemovalMessage; import org.apache.geode.internal.cache.wan.serial.BatchDestroyOperation; -import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.management.internal.JmxManagerAdvisor.JmxManagerProfile; import org.apache.geode.management.internal.JmxManagerAdvisor.JmxManagerProfileMessage; import org.apache.geode.management.internal.JmxManagerLocatorRequest; @@ -924,6 +922,8 @@ public final class DSFIDFactory implements DataSerializableFixedID { registerDSFID(PR_DESTROY_ON_DATA_STORE_MESSAGE, DestroyRegionOnDataStoreMessage.class); registerDSFID(SHUTDOWN_ALL_GATEWAYHUBS_REQUEST, ShutdownAllGatewayHubsRequest.class); registerDSFID(BUCKET_COUNT_LOAD_PROBE, BucketCountLoadProbe.class); + registerDSFID(WAIT_UNTIL_GATEWAY_SENDER_FLUSHED_MESSAGE, + WaitUntilParallelGatewaySenderFlushedCoordinator.WaitUntilGatewaySenderFlushedMessage.class); } /** http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java index 24b452d..84eb8e9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java @@ -810,6 +810,8 @@ public interface DataSerializableFixedID extends SerializationVersions { public static final short LUCENE_TOP_ENTRIES = 2175; public static final short LUCENE_TOP_ENTRIES_COLLECTOR = 2176; + public static final short WAIT_UNTIL_GATEWAY_SENDER_FLUSHED_MESSAGE = 2177; + // NOTE, codes > 65535 will take 4 bytes to serialize /** http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 7fea789..4830912 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -30,6 +30,8 @@ import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.Logger; @@ -78,6 +80,10 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { private long lastKeyRecovered; + private AtomicLong latestQueuedKey = new AtomicLong(); + + private AtomicLong latestAcknowledgedKey = new AtomicLong(); + /** * @param regionName * @param attrs @@ -437,6 +443,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { if (didPut) { if (this.initialized) { this.eventSeqNumQueue.add(key); + updateLargestQueuedKey((Long) key); } if (logger.isDebugEnabled()) { logger.debug("Put successfully in the queue : {} was initialized: {}", @@ -448,6 +455,41 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } } + private void updateLargestQueuedKey(Long key) { + Atomics.setIfGreater(this.latestQueuedKey, key); + } + + private void setLatestAcknowledgedKey(Long key) { + this.latestAcknowledgedKey.set(key); + } + + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { + if (logger.isDebugEnabled()) { + logger.debug("BucketRegionQueue: waitUntilFlushed bucket=" + getId() + "; time=" + + System.currentTimeMillis() + "; timeout=" + timeout + "; unit=" + unit); + } + boolean result = false; + // Wait until latestAcknowledgedKey > latestQueuedKey or the queue is empty + if (this.initialized) { + long latestQueuedKeyToCheck = this.latestQueuedKey.get(); + long nanosRemaining = unit.toNanos(timeout); + long endTime = System.nanoTime() + nanosRemaining; + while (nanosRemaining > 0) { + if (latestAcknowledgedKey.get() > latestQueuedKeyToCheck || isEmpty()) { + result = true; + break; + } + Thread.sleep(Math.min(TimeUnit.NANOSECONDS.toMillis(nanosRemaining) + 1, 100)); + nanosRemaining = endTime - System.nanoTime(); + } + } + if (logger.isDebugEnabled()) { + logger.debug("BucketRegionQueue: waitUntilFlushed completed bucket=" + getId() + "; time=" + + System.currentTimeMillis() + "; result=" + result); + } + return result; + } + /** * It removes the first key from the queue. * @@ -503,6 +545,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { event.setEventId(new EventID(cache.getSystem())); event.setRegion(this); basicDestroy(event, true, null); + setLatestAcknowledgedKey((Long) key); checkReadiness(); } catch (EntryNotFoundException enf) { if (getPartitionedRegion().isDestroyed()) { http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 2c9a65d..1f8704c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -22,9 +22,13 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.geode.InternalGemFireError; +import org.apache.geode.internal.cache.execute.BucketMovedException; import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelCriterion; @@ -33,15 +37,12 @@ import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.DataPolicy; -import org.apache.geode.cache.Operation; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.RegionExistsException; import org.apache.geode.cache.Scope; import org.apache.geode.cache.asyncqueue.AsyncEventListener; -import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; -import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueStats; import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.wan.GatewayEventFilter; @@ -675,7 +676,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi } } - final public RegionQueue getQueue() { + public RegionQueue getQueue() { if (this.eventProcessor != null) { if (!(this.eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor)) { return this.eventProcessor.getQueue(); @@ -1089,7 +1090,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return substituteValue; } - private void initializeEventIdIndex() { + protected void initializeEventIdIndex() { final boolean isDebugEnabled = logger.isDebugEnabled(); boolean gotLock = false; @@ -1240,6 +1241,42 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi return lifeCycleLock; } + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { + int attempts = 0; + boolean result = false; + if (isParallel()) { + // Wait until the sender is flushed. Retry if necessary. + while (true) { + try { + WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = + new WaitUntilParallelGatewaySenderFlushedCoordinator(this, timeout, unit, true); + result = coordinator.waitUntilFlushed(); + break; + } catch (BucketMovedException | CancelException | RegionDestroyedException e) { + attempts++; + logger.warn( + LocalizedStrings.AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETRYING + .toLocalizedString(), + e); + Thread.sleep(100); + } catch (Throwable t) { + attempts++; + logger.warn( + LocalizedStrings.AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETURNING + .toLocalizedString(), + t); + throw new InternalGemFireError(t); + } + } + return result; + } else { + // Serial senders are currently not supported + throw new UnsupportedOperationException( + LocalizedStrings.AbstractGatewaySender_WAIT_UNTIL_FLUSHED_NOT_SUPPORTED_FOR_SERIAL_SENDERS + .toLocalizedString()); + } + } + /** * Has a reference to a GatewayEventImpl and has a timeout value. */ http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java new file mode 100644 index 0000000..07c4e19 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import org.apache.geode.internal.logging.LogService; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.TimeUnit; + +public abstract class WaitUntilGatewaySenderFlushedCoordinator { + + protected AbstractGatewaySender sender; + + protected long timeout; + + protected TimeUnit unit; + + protected boolean initiator; + + protected static final Logger logger = LogService.getLogger(); + + public WaitUntilGatewaySenderFlushedCoordinator(AbstractGatewaySender sender, long timeout, + TimeUnit unit, boolean initiator) { + this.sender = sender; + this.timeout = timeout; + this.unit = unit; + this.initiator = initiator; + } + + public abstract boolean waitUntilFlushed() throws Throwable; +} http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java new file mode 100644 index 0000000..a4c03a9 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.Cache; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.*; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.*; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinator; +import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; + +public class WaitUntilParallelGatewaySenderFlushedCoordinator + extends WaitUntilGatewaySenderFlushedCoordinator { + + public WaitUntilParallelGatewaySenderFlushedCoordinator(AbstractGatewaySender sender, + long timeout, TimeUnit unit, boolean initiator) { + super(sender, timeout, unit, initiator); + } + + public boolean waitUntilFlushed() throws Throwable { + boolean remoteResult = true, localResult = true; + Throwable exceptionToThrow = null; + ConcurrentParallelGatewaySenderQueue prq = + (ConcurrentParallelGatewaySenderQueue) this.sender.getQueue(); + PartitionedRegion pr = (PartitionedRegion) prq.getRegion(); + + // Create callables for local buckets + List<WaitUntilBucketRegionQueueFlushedCallable> callables = + buildWaitUntilBucketRegionQueueFlushedCallables(pr); + + // Submit local callables for execution + ExecutorService service = this.sender.getDistributionManager().getWaitingThreadPool(); + List<Future<Boolean>> callableFutures = new ArrayList<>(); + for (Callable<Boolean> callable : callables) { + callableFutures.add(service.submit(callable)); + } + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Created and submitted " + + callables.size() + " callables=" + callables); + } + + // Send message to remote buckets + if (this.initiator) { + remoteResult = false; + try { + remoteResult = waitUntilFlushedOnRemoteMembers(pr); + } catch (Throwable t) { + exceptionToThrow = t; + } + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Processed remote result=" + + remoteResult + "; exceptionToThrow=" + exceptionToThrow); + } + } + + // Process local future results + for (Future<Boolean> future : callableFutures) { + boolean singleBucketResult = false; + try { + singleBucketResult = future.get(); + } catch (ExecutionException e) { + exceptionToThrow = e.getCause(); + } + localResult = localResult && singleBucketResult; + } + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Processed local result=" + + localResult + "; exceptionToThrow=" + exceptionToThrow); + } + + // Return the full result + if (exceptionToThrow == null) { + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Returning full result=" + + (remoteResult && localResult)); + } + return remoteResult && localResult; + } else { + throw exceptionToThrow; + } + } + + protected List<WaitUntilBucketRegionQueueFlushedCallable> buildWaitUntilBucketRegionQueueFlushedCallables( + PartitionedRegion pr) { + List<WaitUntilBucketRegionQueueFlushedCallable> callables = new ArrayList<>(); + if (pr.isDataStore()) { + for (BucketRegion br : pr.getDataStore().getAllLocalBucketRegions()) { + callables.add(new WaitUntilBucketRegionQueueFlushedCallable((BucketRegionQueue) br, + this.timeout, this.unit)); + } + } + return callables; + } + + protected boolean waitUntilFlushedOnRemoteMembers(PartitionedRegion pr) throws Throwable { + boolean result = true; + DM dm = this.sender.getDistributionManager(); + Set<InternalDistributedMember> recipients = pr.getRegionAdvisor().adviseDataStore(); + if (!recipients.isEmpty()) { + if (logger.isDebugEnabled()) { + logger.debug( + "WaitUntilParallelGatewaySenderFlushedCoordinator: About to send message recipients=" + + recipients); + } + WaitUntilGatewaySenderFlushedReplyProcessor processor = + new WaitUntilGatewaySenderFlushedReplyProcessor(dm, recipients); + WaitUntilGatewaySenderFlushedMessage message = new WaitUntilGatewaySenderFlushedMessage( + recipients, processor.getProcessorId(), this.sender.getId(), this.timeout, this.unit); + dm.putOutgoing(message); + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Sent message recipients=" + + recipients); + } + try { + processor.waitForReplies(); + result = processor.getCombinedResult(); + } catch (ReplyException e) { + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilParallelGatewaySenderFlushedCoordinator: Caught e=" + e + + "; cause=" + e.getCause()); + } + throw e.getCause(); + } catch (InterruptedException e) { + dm.getCancelCriterion().checkCancelInProgress(e); + Thread.currentThread().interrupt(); + result = false; + } + } + return result; + } + + public static class WaitUntilBucketRegionQueueFlushedCallable implements Callable<Boolean> { + + private BucketRegionQueue brq; + + private long timeout; + + private TimeUnit unit; + + public WaitUntilBucketRegionQueueFlushedCallable(BucketRegionQueue brq, long timeout, + TimeUnit unit) { + this.brq = brq; + this.timeout = timeout; + this.unit = unit; + } + + @Override + public Boolean call() throws Exception { + return this.brq.waitUntilFlushed(this.timeout, this.unit); + } + + @Override + public String toString() { + return new StringBuilder().append(getClass().getSimpleName()).append("[").append("brq=") + .append(this.brq.getId()).append("]").toString(); + } + } + + public static class WaitUntilGatewaySenderFlushedReplyProcessor extends ReplyProcessor21 { + + private Map<DistributedMember, Boolean> responses; + + public WaitUntilGatewaySenderFlushedReplyProcessor(DM dm, Collection initMembers) { + super(dm, initMembers); + initializeResponses(); + } + + private void initializeResponses() { + this.responses = new ConcurrentHashMap<>(); + for (InternalDistributedMember member : getMembers()) { + this.responses.put(member, false); + } + } + + @Override + public void process(DistributionMessage msg) { + try { + if (msg instanceof ReplyMessage) { + ReplyMessage reply = (ReplyMessage) msg; + if (logger.isDebugEnabled()) { + logger + .debug("WaitUntilGatewaySenderFlushedReplyProcessor: Processing reply from sender=" + + reply.getSender() + "; returnValue=" + reply.getReturnValue() + "; exception=" + + reply.getException()); + } + if (reply.getException() == null) { + this.responses.put(reply.getSender(), (Boolean) reply.getReturnValue()); + } else { + reply.getException().printStackTrace(); + } + } + } finally { + super.process(msg); + } + } + + public boolean getCombinedResult() { + boolean combinedResult = true; + for (boolean singleMemberResult : this.responses.values()) { + combinedResult = combinedResult && singleMemberResult; + } + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilGatewaySenderFlushedReplyProcessor: Returning combinedResult=" + + combinedResult); + } + return combinedResult; + } + } + + public static class WaitUntilGatewaySenderFlushedMessage extends PooledDistributionMessage + implements MessageWithReply { + + private int processorId; + + private String gatewaySenderId; + + private long timeout; + + private TimeUnit unit; + + /* For serialization */ + public WaitUntilGatewaySenderFlushedMessage() {} + + protected WaitUntilGatewaySenderFlushedMessage(Collection recipients, int processorId, + String gatewaySenderId, long timeout, TimeUnit unit) { + super(); + setRecipients(recipients); + this.processorId = processorId; + this.gatewaySenderId = gatewaySenderId; + this.timeout = timeout; + this.unit = unit; + } + + @Override + protected void process(DistributionManager dm) { + boolean result = false; + ReplyException replyException = null; + try { + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilGatewaySenderFlushedMessage: Processing gatewaySenderId=" + + this.gatewaySenderId + "; timeout=" + this.timeout + "; unit=" + this.unit); + } + Cache cache = GemFireCacheImpl.getInstance(); + if (cache != null) { + AbstractGatewaySender sender = + (AbstractGatewaySender) cache.getGatewaySender(this.gatewaySenderId); + if (sender != null) { + try { + WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = + new WaitUntilParallelGatewaySenderFlushedCoordinator(sender, this.timeout, + this.unit, false); + result = coordinator.waitUntilFlushed(); + } catch (Throwable e) { + replyException = new ReplyException(e); + } + } + } + } finally { + ReplyMessage replyMsg = new ReplyMessage(); + replyMsg.setRecipient(getSender()); + replyMsg.setProcessorId(this.processorId); + if (replyException == null) { + replyMsg.setReturnValue(result); + } else { + replyMsg.setException(replyException); + } + if (logger.isDebugEnabled()) { + logger.debug("WaitUntilGatewaySenderFlushedMessage: Sending reply returnValue=" + + replyMsg.getReturnValue() + "; exception=" + replyMsg.getException()); + } + dm.putOutgoing(replyMsg); + } + } + + @Override + public int getDSFID() { + return WAIT_UNTIL_GATEWAY_SENDER_FLUSHED_MESSAGE; + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + out.writeInt(this.processorId); + DataSerializer.writeString(this.gatewaySenderId, out); + out.writeLong(this.timeout); + DataSerializer.writeEnum(this.unit, out); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.processorId = in.readInt(); + this.gatewaySenderId = DataSerializer.readString(in); + this.timeout = in.readLong(); + this.unit = DataSerializer.readEnum(TimeUnit.class, in); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java index c616031..1c214e4 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java +++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java @@ -7665,6 +7665,15 @@ public class LocalizedStrings { new StringId(6647, "Cannot create GatewaySender {0} because the maximum ({1}) has been reached"); + public static final StringId AbstractGatewaySender_WAIT_UNTIL_FLUSHED_NOT_SUPPORTED_FOR_SERIAL_SENDERS = + new StringId(6648, "waitUntilFlushed is not currently supported for serial gateway senders"); + public static final StringId AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETRYING = + new StringId(6649, + "Caught the following exception attempting waitUntilFlushed and will retry:"); + public static final StringId AbstractGatewaySender_CAUGHT_EXCEPTION_ATTEMPTING_WAIT_UNTIL_FLUSHED_RETURNING = + new StringId(6650, + "Caught the following exception attempting waitUntilFlushed and will return:"); + /** Testing strings, messageId 90000-99999 **/ /** http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java new file mode 100644 index 0000000..17696bb --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/WaitUntilGatewaySenderFlushedCoordinatorJUnitTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan; + +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.RegionQueue; +import org.junit.After; +import org.junit.Before; + +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.mockito.Mockito.*; + +public abstract class WaitUntilGatewaySenderFlushedCoordinatorJUnitTest { + + protected GemFireCacheImpl cache; + + protected AbstractGatewaySender sender; + + @Before + public void setUp() { + createCache(); + createGatewaySender(); + } + + @After + public void tearDown() { + if (this.cache != null) { + this.cache.close(); + } + } + + private void createCache() { + this.cache = (GemFireCacheImpl) new CacheFactory().set(MCAST_PORT, "0") + .set(LOG_LEVEL, "warning").create(); + } + + protected void createGatewaySender() { + this.sender = spy(AbstractGatewaySender.class); + this.sender.cache = this.cache; + this.sender.eventProcessor = getEventProcessor(); + } + + protected RegionQueue getQueue() { + return this.sender.eventProcessor.getQueue(); + } + + protected abstract AbstractGatewaySenderEventProcessor getEventProcessor(); +} http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java new file mode 100644 index 0000000..c8b8ba1 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.parallel; + +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.wan.WaitUntilGatewaySenderFlushedCoordinatorJUnitTest; +import org.apache.geode.internal.cache.wan.parallel.WaitUntilParallelGatewaySenderFlushedCoordinator.WaitUntilBucketRegionQueueFlushedCallable; +import org.apache.geode.test.junit.categories.IntegrationTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@Category(IntegrationTest.class) +public class WaitUntilParallelGatewaySenderFlushedCoordinatorJUnitTest + extends WaitUntilGatewaySenderFlushedCoordinatorJUnitTest { + + private PartitionedRegion region; + + protected void createGatewaySender() { + super.createGatewaySender(); + ConcurrentParallelGatewaySenderQueue queue = + (ConcurrentParallelGatewaySenderQueue) spy(getQueue()); + doReturn(queue).when(this.sender).getQueue(); + this.region = mock(PartitionedRegion.class); + doReturn(this.region).when(queue).getRegion(); + } + + protected AbstractGatewaySenderEventProcessor getEventProcessor() { + ConcurrentParallelGatewaySenderEventProcessor processor = + spy(new ConcurrentParallelGatewaySenderEventProcessor(this.sender)); + return processor; + } + + @Test + public void testWaitUntilParallelGatewaySenderFlushedSuccessfulNotInitiator() throws Throwable { + WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, + TimeUnit.MILLISECONDS, false); + WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); + doReturn(getSuccessfulCallables()).when(coordinatorSpy) + .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + boolean result = coordinatorSpy.waitUntilFlushed(); + assertTrue(result); + } + + @Test + public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulNotInitiator() throws Throwable { + WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, + TimeUnit.MILLISECONDS, false); + WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); + doReturn(getUnsuccessfulCallables()).when(coordinatorSpy) + .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + boolean result = coordinatorSpy.waitUntilFlushed(); + assertFalse(result); + } + + @Test + public void testWaitUntilParallelGatewaySenderFlushedSuccessfulInitiator() throws Throwable { + WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, + TimeUnit.MILLISECONDS, true); + WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); + doReturn(getSuccessfulCallables()).when(coordinatorSpy) + .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + doReturn(true).when(coordinatorSpy).waitUntilFlushedOnRemoteMembers(this.region); + boolean result = coordinatorSpy.waitUntilFlushed(); + assertTrue(result); + } + + @Test + public void testWaitUntilParallelGatewaySenderFlushedUnsuccessfulInitiator() throws Throwable { + WaitUntilParallelGatewaySenderFlushedCoordinator coordinator = + new WaitUntilParallelGatewaySenderFlushedCoordinator(this.sender, 1000l, + TimeUnit.MILLISECONDS, true); + WaitUntilParallelGatewaySenderFlushedCoordinator coordinatorSpy = spy(coordinator); + doReturn(getSuccessfulCallables()).when(coordinatorSpy) + .buildWaitUntilBucketRegionQueueFlushedCallables(this.region); + doReturn(false).when(coordinatorSpy).waitUntilFlushedOnRemoteMembers(this.region); + boolean result = coordinatorSpy.waitUntilFlushed(); + assertFalse(result); + } + + private List<WaitUntilBucketRegionQueueFlushedCallable> getSuccessfulCallables() + throws Exception { + List callables = new ArrayList(); + WaitUntilBucketRegionQueueFlushedCallable callable = + mock(WaitUntilBucketRegionQueueFlushedCallable.class); + when(callable.call()).thenReturn(true); + callables.add(callable); + return callables; + } + + private List<WaitUntilBucketRegionQueueFlushedCallable> getUnsuccessfulCallables() + throws Exception { + List callables = new ArrayList(); + WaitUntilBucketRegionQueueFlushedCallable callable = + mock(WaitUntilBucketRegionQueueFlushedCallable.class); + when(callable.call()).thenReturn(false); + callables.add(callable); + return callables; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt ---------------------------------------------------------------------- diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 3f90ca4..0a791c4 100644 --- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -2134,3 +2134,7 @@ org/apache/geode/redis/internal/DoubleWrapper,2 fromData,9,2a2bb80004b50002b1 toData,9,2ab400022bb80003b1 +org/apache/geode/internal/cache/wan/parallel/WaitUntilParallelGatewaySenderFlushedCoordinator$WaitUntilGatewaySenderFlushedMessage,2 +fromData,47,2a2bb700322a2bb900330100b500032a2bb80034b500042a2bb900350100b500052a12362bb80037c00036b50006b1 +toData,42,2a2bb7002d2b2ab40003b9002e02002ab400042bb8002f2b2ab40005b9003003002ab400062bb80031b1 + http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java index f423c74..802b21a 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/LuceneIndex.java @@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; @@ -57,10 +58,12 @@ public interface LuceneIndex { /* * wait until the current entries in cache are indexed * - * @param maxWaitInMilliseconds max wait time in millisecond + * @param timeout max wait time * - * @return if entries are flushed within maxWait + * @param unit granularity of the timeout + * + * @return if entries are flushed within timeout */ - public boolean waitUntilFlushed(int maxWaitInMillisecond); + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java index 03d85c3..0b6685d 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexImpl.java @@ -30,9 +30,7 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl; -import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey; -import org.apache.geode.cache.lucene.internal.filesystem.File; -import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.apache.geode.cache.lucene.internal.repository.RepositoryManager; import org.apache.geode.cache.lucene.internal.xml.LuceneIndexCreation; import org.apache.geode.internal.cache.GemFireCacheImpl; @@ -92,29 +90,15 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { } @Override - public boolean waitUntilFlushed(int maxWaitInMillisecond) { + public boolean waitUntilFlushed(long timeout, TimeUnit unit) throws InterruptedException { String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); - AsyncEventQueue queue = (AsyncEventQueue) cache.getAsyncEventQueue(aeqId); - boolean flushed = false; + AsyncEventQueueImpl queue = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId); if (queue != null) { - long start = System.nanoTime(); - while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWaitInMillisecond)) { - if (0 == queue.size()) { - flushed = true; - break; - } else { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - } - } - } + return queue.waitUntilFlushed(timeout, unit); } else { throw new IllegalArgumentException( "The AEQ does not exist for the index " + indexName + " region " + regionPath); } - - return flushed; } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java index 67b7392..030dddd 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/xml/LuceneIndexCreation.java @@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene.internal.xml; import java.util.*; +import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; @@ -101,7 +102,7 @@ public class LuceneIndexCreation implements LuceneIndex, Extension<Region<?, ?>> } @Override - public boolean waitUntilFlushed(int maxWaitInMillisecond) { + public boolean waitUntilFlushed(long timeout, TimeUnit unit) { return true; } } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java index 1feba06..644ed78 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java @@ -52,7 +52,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest region.put("object-4", new TestObject("hello world", "hello world")); LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); LuceneQuery query = luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME, "description:\"hello world\"", DEFAULT_FIELD); PageableLuceneQueryResults<Integer, TestObject> results = query.findPages(); @@ -61,7 +61,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest // begin transaction cache.getCacheTransactionManager().begin(); region.put("object-1", new TestObject("title 1", "updated")); - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); assertEquals(3, query.findPages().size()); } @@ -76,7 +76,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest region.put("object-4", new TestObject("hello world", "hello world")); LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); LuceneQuery query = luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME, "description:\"hello world\"", DEFAULT_FIELD); PageableLuceneQueryResults<Integer, TestObject> results = query.findPages(); @@ -85,7 +85,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest cache.getCacheTransactionManager().begin(); region.put("object-1", new TestObject("title 1", "updated")); cache.getCacheTransactionManager().commit(); - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); assertEquals(2, query.findPages().size()); } @@ -101,7 +101,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest region.put("object-4", new TestObject("hello world", "hello world")); LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); LuceneQuery query = luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME, "description:\"hello world\"", DEFAULT_FIELD); PageableLuceneQueryResults<Integer, TestObject> results = query.findPages(); @@ -110,7 +110,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest cache.getCacheTransactionManager().begin(); region.put("object-1", new TestObject("title 1", "updated")); cache.getCacheTransactionManager().rollback(); - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); assertEquals(3, query.findPages().size()); } @@ -127,7 +127,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest LuceneIndexForPartitionedRegion index = (LuceneIndexForPartitionedRegion) luceneService.getIndex(INDEX_NAME, REGION_NAME); - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); FileSystemStats fileSystemStats = index.getFileSystemStats(); LuceneIndexStats indexStats = index.getIndexStats(); @@ -154,7 +154,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); // Wait for events to be flushed from AEQ. - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); // Execute query to fetch all the values for "description" field. LuceneQuery query = luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME, "description:\"hello world\"", DEFAULT_FIELD); @@ -173,7 +173,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest region.put(113, new TestObject("hello world", "hello world")); LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); // Wait for events to be flushed from AEQ. - index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME); + index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS); // Execute query to fetch all the values for "description" field. LuceneQuery query = luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME, "description:\"hello world\"", DEFAULT_FIELD); @@ -182,7 +182,7 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest } @Test - public void entriesFlushedToIndexAfterWaitForFlushCalled() { + public void entriesFlushedToIndexAfterWaitForFlushCalled() throws InterruptedException { luceneService.createIndex(INDEX_NAME, REGION_NAME, "title", "description"); Region region = createRegion(REGION_NAME, RegionShortcut.PARTITION); @@ -194,9 +194,9 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest region.put("object-4", new TestObject("hello world", "hello world")); LuceneIndexImpl index = (LuceneIndexImpl) luceneService.getIndex(INDEX_NAME, REGION_NAME); - assertFalse(index.waitUntilFlushed(500)); + assertFalse(index.waitUntilFlushed(500, TimeUnit.MILLISECONDS)); LuceneTestUtilities.resumeSender(cache); - assertTrue(index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME)); + assertTrue(index.waitUntilFlushed(WAIT_FOR_FLUSH_TIME, TimeUnit.MILLISECONDS)); assertEquals(4, index.getIndexStats().getCommits()); } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java index 7c11d7c..a56fff7 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesBase.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.geode.cache.Cache; @@ -115,7 +116,7 @@ public abstract class LuceneQueriesBase extends LuceneDUnitTest { LuceneService service = LuceneServiceProvider.get(cache); LuceneIndexImpl index = (LuceneIndexImpl) service.getIndex(INDEX_NAME, REGION_NAME); - return index.waitUntilFlushed(ms); + return index.waitUntilFlushed(ms, TimeUnit.MILLISECONDS); }); } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java index 6cbb1fc..d11ea91 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesIntegrationTest.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.lucene.analysis.Analyzer; @@ -89,7 +90,7 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { // <field1:one two three> <field2:one two three> // <field1:one@three> <field2:one@three> - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); // standard analyzer with double quote // this query string will be parsed as "one three" @@ -139,7 +140,7 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { region.put("primitiveInt2", 223); region.put("primitiveInt3", 224); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); verifyQueryUsingCustomizedProvider(LuceneService.REGION_VALUE_FIELD, 123, 223, "primitiveInt1", "primitiveInt2"); } @@ -158,14 +159,14 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { region.put("primitiveInt2", 223); region.put("primitiveInt3", 224); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); // Note: current QueryParser cannot query by range. It's a known issue in lucene verifyQuery(LuceneService.REGION_VALUE_FIELD + ":[123 TO 223]", LuceneService.REGION_VALUE_FIELD); region.put("primitiveDouble1", 123.0); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); thrown.expectMessage("java.lang.IllegalArgumentException"); verifyQueryUsingCustomizedProvider(LuceneService.REGION_VALUE_FIELD, 123, 223, "primitiveInt1", @@ -202,7 +203,8 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { assertEquals(region.values(), new HashSet(query.findValues())); } - private LuceneQuery<Object, Object> addValuesAndCreateQuery(int pagesize) { + private LuceneQuery<Object, Object> addValuesAndCreateQuery(int pagesize) + throws InterruptedException { luceneService.createIndex(INDEX_NAME, REGION_NAME, "field1", "field2"); region = cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME); final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); @@ -219,7 +221,7 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { region.put("F", new TestObject(value3, value3)); region.put("G", new TestObject(value1, value2)); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); return luceneService.createLuceneQueryFactory().setPageSize(pagesize).create(INDEX_NAME, REGION_NAME, "one", "field1"); } @@ -244,7 +246,7 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { region.put("C", new TestObject(value3, value3)); region.put("D", new TestObject(value4, value4)); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); verifyQuery("field1:one AND field2:two_four", DEFAULT_FIELD, "A"); verifyQuery("field1:one AND field2:two", DEFAULT_FIELD, "A"); @@ -263,7 +265,7 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { // Put two values with some of the same tokens String value1 = "one three"; region.put("A", new TestObject(value1, null)); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); verifyQuery("field1:one", DEFAULT_FIELD, "A"); } @@ -282,7 +284,7 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { PdxInstance pdx1 = insertAJson(region, "jsondoc1"); PdxInstance pdx2 = insertAJson(region, "jsondoc2"); PdxInstance pdx10 = insertAJson(region, "jsondoc10"); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); HashMap expectedResults = new HashMap(); expectedResults.put("jsondoc1", pdx1); @@ -297,7 +299,7 @@ public class LuceneQueriesIntegrationTest extends LuceneIntegrationTest { final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); region.put("A", "one three"); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); verifyQuery("one", LuceneService.REGION_VALUE_FIELD, "A"); } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java index c83763f..5ec09f1 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesPersistenceIntegrationTest.java @@ -18,6 +18,7 @@ import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.*; import static org.junit.Assert.*; import java.io.File; +import java.util.concurrent.TimeUnit; import org.apache.geode.cache.EvictionAction; import org.apache.geode.cache.EvictionAlgorithm; @@ -83,7 +84,7 @@ public class LuceneQueriesPersistenceIntegrationTest extends LuceneIntegrationTe value = new Type1("lucene world", 1, 2L, 3.0, 4.0f); userRegion.put("value3", value); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); PartitionedRegion fileRegion = (PartitionedRegion) cache.getRegion(aeqId + ".files"); assertNotNull(fileRegion); http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java index 17fbcfc..aaa6dbd 100755 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexImplJUnitTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.cache.lucene.internal; +import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -21,6 +22,7 @@ import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import static org.mockito.Mockito.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.geode.cache.Cache; @@ -49,13 +51,13 @@ public class LuceneIndexImplJUnitTest { @Test public void waitUnitFlushedWithMissingAEQThrowsIllegalArgument() throws Exception { thrown.expect(IllegalArgumentException.class); - index.waitUntilFlushed(MAX_WAIT); + index.waitUntilFlushed(MAX_WAIT, TimeUnit.MILLISECONDS); } @Test public void waitUnitFlushedWaitsForFlush() throws Exception { final String expectedIndexName = LuceneServiceImpl.getUniqueIndexName(INDEX, REGION); - final AsyncEventQueue queue = mock(AsyncEventQueue.class); + final AsyncEventQueueImpl queue = mock(AsyncEventQueueImpl.class); when(cache.getAsyncEventQueue(eq(expectedIndexName))).thenReturn(queue); AtomicInteger callCount = new AtomicInteger(); @@ -69,7 +71,7 @@ public class LuceneIndexImplJUnitTest { return 0; } }); - index.waitUntilFlushed(MAX_WAIT); + index.waitUntilFlushed(MAX_WAIT, TimeUnit.MILLISECONDS); verify(cache).getAsyncEventQueue(eq(expectedIndexName)); } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java index 10127ac..d194081 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java @@ -35,6 +35,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.IOException; +import java.util.concurrent.TimeUnit; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.junit.Assert.assertNotNull; @@ -73,7 +74,8 @@ public class LuceneIndexRecoveryHAIntegrationTest { * construct index. This test simulates the same. */ // @Test - public void recoverRepoInANewNode() throws BucketNotFoundException, IOException { + public void recoverRepoInANewNode() + throws BucketNotFoundException, IOException, InterruptedException { LuceneServiceImpl service = (LuceneServiceImpl) LuceneServiceProvider.get(cache); service.createIndex("index1", "/userRegion", indexedFields); PartitionAttributes<String, String> attrs = @@ -87,7 +89,7 @@ public class LuceneIndexRecoveryHAIntegrationTest { (LuceneIndexForPartitionedRegion) service.getIndex("index1", "/userRegion"); // put an entry to create the bucket userRegion.put("rebalance", "test"); - index.waitUntilFlushed(30000); + index.waitUntilFlushed(30000, TimeUnit.MILLISECONDS); RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper); IndexRepository repo = manager.getRepository(userRegion, 0, null); @@ -112,9 +114,10 @@ public class LuceneIndexRecoveryHAIntegrationTest { - private void verifyIndexFinishFlushing(String indexName, String regionName) { + private void verifyIndexFinishFlushing(String indexName, String regionName) + throws InterruptedException { LuceneIndex index = LuceneServiceProvider.get(cache).getIndex(indexName, regionName); - boolean flushed = index.waitUntilFlushed(60000); + boolean flushed = index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); assertTrue(flushed); } } http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java index f0f1579..1c8bc88 100755 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/cli/LuceneIndexCommandsDUnitTest.java @@ -610,7 +610,8 @@ public class LuceneIndexCommandsDUnitTest extends CliCommandTestBase { LuceneService luceneService = LuceneServiceProvider.get(getCache()); Region region = getCache().getRegion(REGION_NAME); region.putAll(entries); - luceneService.getIndex(INDEX_NAME, REGION_NAME).waitUntilFlushed(60000); + luceneService.getIndex(INDEX_NAME, REGION_NAME).waitUntilFlushed(60000, + TimeUnit.MILLISECONDS); LuceneIndexImpl index = (LuceneIndexImpl) luceneService.getIndex(INDEX_NAME, REGION_NAME); Awaitility.await().atMost(65, TimeUnit.SECONDS) .until(() -> assertEquals(countOfDocuments, index.getIndexStats().getDocuments())); http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java index 697166e..5c47e13 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/directory/DumpDirectoryFilesIntegrationTest.java @@ -18,6 +18,7 @@ import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.*; import static org.junit.Assert.*; import java.io.File; +import java.util.concurrent.TimeUnit; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; @@ -55,7 +56,7 @@ public class DumpDirectoryFilesIntegrationTest extends LuceneIntegrationTest { InternalLuceneIndex index = (InternalLuceneIndex) luceneService.getIndex(INDEX_NAME, REGION_NAME); - index.waitUntilFlushed(60000); + index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); index.dumpFiles(diskDirRule.get().getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/geode/blob/44cd72d8/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java index d8816a3..21c0bbc 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.geode.cache.Cache; @@ -86,10 +87,11 @@ public class LuceneTestUtilities { LuceneServiceProvider.get(cache).createIndex(INDEX_NAME, REGION_NAME, fieldNames); } - public static void verifyIndexFinishFlushing(Cache cache, String indexName, String regionName) { + public static void verifyIndexFinishFlushing(Cache cache, String indexName, String regionName) + throws InterruptedException { LuceneService luceneService = LuceneServiceProvider.get(cache); LuceneIndex index = luceneService.getIndex(indexName, regionName); - boolean flushed = index.waitUntilFlushed(60000); + boolean flushed = index.waitUntilFlushed(60000, TimeUnit.MILLISECONDS); assertTrue(flushed); }