This is an automated email from the ASF dual-hosted git repository.
donalevans pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new eccd4f0 GEODE-8536: Allow limited retries when creating Lucene
IndexWriter (#5553)
eccd4f0 is described below
commit eccd4f0d867579990a6a5de3dc070d45ef8bb8fb
Author: Donal Evans <[email protected]>
AuthorDate: Fri Oct 2 18:43:18 2020 -0700
GEODE-8536: Allow limited retries when creating Lucene IndexWriter (#5553)
Authored-by: Donal Evans <[email protected]>
---
.../IndexRepositoryFactoryDistributedTest.java | 2 -
.../IndexRepositoryFactoryIntegrationTest.java | 107 +++++++++++++++++++++
.../lucene/internal/IndexRepositoryFactory.java | 41 ++++++--
.../internal/IndexRepositoryFactoryTest.java | 45 +++++++--
4 files changed, 177 insertions(+), 18 deletions(-)
diff --git
a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
index a3f245d..bfe921f 100644
---
a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
+++
b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
@@ -25,7 +25,6 @@ import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
-import junitparams.Parameters;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Before;
@@ -111,7 +110,6 @@ public class IndexRepositoryFactoryDistributedTest
implements Serializable {
}
@Test
- @Parameters()
public void lockedBucketShouldPreventPrimaryFromMoving() {
dataStore1.invoke(this::initDataStoreAndLuceneIndex);
dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
diff --git
a/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
b/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
new file mode 100644
index 0000000..0506b75
--- /dev/null
+++
b/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.lucene.LuceneQuery;
+import org.apache.geode.cache.lucene.LuceneQueryException;
+import org.apache.geode.cache.lucene.LuceneServiceProvider;
+import org.apache.geode.cache.lucene.test.TestObject;
+import org.apache.geode.internal.cache.PartitionedRegion;
+
+public class IndexRepositoryFactoryIntegrationTest {
+ public Cache cache;
+ public static final String INDEX_NAME = "testIndex";
+ public static final String REGION_NAME = "testRegion";
+ private IndexRepositoryFactory spyFactory;
+ private LuceneQuery<Object, Object> luceneQuery;
+
+ @Before
+ public void setUp() {
+ cache = new CacheFactory().create();
+ LuceneServiceProvider.get(cache).createIndexFactory().setFields("field1",
"field2")
+ .create(INDEX_NAME, REGION_NAME);
+
+ Region<Object, Object> dataRegion =
+
cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);
+
+ dataRegion.put("A", new TestObject());
+
+ spyFactory = spy(new IndexRepositoryFactory());
+ PartitionedRepositoryManager.indexRepositoryFactory = spyFactory;
+
+ luceneQuery = LuceneServiceProvider.get(cache).createLuceneQueryFactory()
+ .create(INDEX_NAME, REGION_NAME, "hello", "field1");
+ }
+
+ @After
+ public void tearDown() {
+ if (cache != null) {
+ cache.close();
+ }
+ }
+
+ @Test
+ public void
shouldRetryWhenIOExceptionEncounteredOnceDuringComputingRepository()
+ throws IOException, LuceneQueryException {
+ // To ensure that the specific bucket used in the query throws the
IOException to trigger the
+ // retry, throw once for every bucket in the region
+ int timesToThrow = ((PartitionedRegion)
cache.getRegion(REGION_NAME)).getTotalNumberOfBuckets();
+
+ doAnswer(new Answer<Object>() {
+ private int times = 0;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (times < timesToThrow) {
+ times++;
+ throw new IOException();
+ }
+ return invocation.callRealMethod();
+ }
+ }).when(spyFactory).getIndexWriter(any(), any());
+
+ luceneQuery.findKeys();
+ }
+
+ @Test
+ public void
shouldThrowInternalGemfireErrorWhenIOExceptionEncounteredConsistentlyDuringComputingRepository()
+ throws IOException {
+ doThrow(new IOException()).when(spyFactory).getIndexWriter(any(), any());
+
+
assertThatThrownBy(luceneQuery::findKeys).isInstanceOf(FunctionException.class)
+ .hasCauseInstanceOf(InternalGemFireError.class);
+ }
+}
diff --git
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index 7674a45..7db8b96 100644
---
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -44,6 +44,7 @@ public class IndexRepositoryFactory {
private static final Logger logger = LogService.getLogger();
public static final String FILE_REGION_LOCK_FOR_BUCKET_ID =
"FileRegionLockForBucketId:";
public static final String APACHE_GEODE_INDEX_COMPLETE =
"APACHE_GEODE_INDEX_COMPLETE";
+ protected static final int GET_INDEX_WRITER_MAX_ATTEMPTS = 200;
public IndexRepositoryFactory() {}
@@ -74,7 +75,8 @@ public class IndexRepositoryFactory {
* This is a util function just to not let computeIndexRepository be a huge
chunk of code.
*/
protected IndexRepository finishComputingRepository(Integer bucketId,
LuceneSerializer serializer,
- PartitionedRegion userRegion, IndexRepository oldRepository,
InternalLuceneIndex index) {
+ PartitionedRegion userRegion, IndexRepository oldRepository,
InternalLuceneIndex index)
+ throws IOException {
LuceneIndexForPartitionedRegion indexForPR =
(LuceneIndexForPartitionedRegion) index;
final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
@@ -129,7 +131,7 @@ public class IndexRepositoryFactory {
} catch (IOException e) {
logger.warn("Exception thrown while constructing Lucene Index for
bucket:" + bucketId
+ " for file region:" + fileAndChunkBucket.getFullPath(), e);
- return null;
+ throw e;
} catch (CacheClosedException e) {
logger.info("CacheClosedException thrown while constructing Lucene Index
for bucket:"
+ bucketId + " for file region:" + fileAndChunkBucket.getFullPath());
@@ -144,11 +146,34 @@ public class IndexRepositoryFactory {
protected IndexWriter buildIndexWriter(int bucketId, BucketRegion
fileAndChunkBucket,
LuceneIndexForPartitionedRegion indexForPR) throws IOException {
- // bucketTargetingMap handles partition resolver (via bucketId as
callbackArg)
- Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket,
bucketId);
- RegionDirectory dir = new RegionDirectory(bucketTargetingMap,
indexForPR.getFileSystemStats());
- IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
+ int attempts = 0;
+ // IOExceptions can occur if the fileAndChunk region is being modified
while the IndexWriter is
+ // being initialized, so allow limited retries here to account for that
timing window
+ while (true) {
+ // bucketTargetingMap handles partition resolver (via bucketId as
callbackArg)
+ Map<Object, Object> bucketTargetingMap =
getBucketTargetingMap(fileAndChunkBucket, bucketId);
+ RegionDirectory dir =
+ new RegionDirectory(bucketTargetingMap,
indexForPR.getFileSystemStats());
+ IndexWriterConfig config = new
IndexWriterConfig(indexForPR.getAnalyzer());
+ try {
+ attempts++;
+ return getIndexWriter(dir, config);
+ } catch (IOException e) {
+ if (attempts >= GET_INDEX_WRITER_MAX_ATTEMPTS) {
+ throw e;
+ }
+ logger.info("Encountered {} while attempting to get IndexWriter for
index {}. Retrying...",
+ e, indexForPR.getName());
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ }
+ protected IndexWriter getIndexWriter(RegionDirectory dir, IndexWriterConfig
config)
+ throws IOException {
return new IndexWriter(dir, config);
}
@@ -186,8 +211,8 @@ public class IndexRepositoryFactory {
return value;
}
- protected Map getBucketTargetingMap(BucketRegion region, int bucketId) {
- return new BucketTargetingMap(region, bucketId);
+ protected Map<Object, Object> getBucketTargetingMap(BucketRegion region, int
bucketId) {
+ return new BucketTargetingMap<>(region, bucketId);
}
protected String getLockName(final BucketRegion fileAndChunkBucket) {
diff --git
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
index e301dcf..38e6355 100644
---
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
+++
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.cache.lucene.internal;
+import static
org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.GET_INDEX_WRITER_MAX_ATTEMPTS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
@@ -22,11 +23,13 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import org.apache.lucene.index.IndexWriter;
import org.junit.Before;
import org.junit.Test;
@@ -77,7 +80,8 @@ public class IndexRepositoryFactoryTest {
}
@Test
- public void
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull()
{
+ public void
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull()
+ throws IOException {
doReturn(null).when(indexRepositoryFactory).getMatchingBucket(fileRegion,
bucketId);
IndexRepository indexRepository =
indexRepositoryFactory.finishComputingRepository(0,
@@ -87,7 +91,8 @@ public class IndexRepositoryFactoryTest {
}
@Test
- public void
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary()
{
+ public void
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary()
+ throws IOException {
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(false);
IndexRepository indexRepository =
indexRepositoryFactory.finishComputingRepository(0,
@@ -97,7 +102,8 @@ public class IndexRepositoryFactoryTest {
}
@Test
- public void
finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed() {
+ public void
finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed()
+ throws IOException {
when(oldRepository.isClosed()).thenReturn(false);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
@@ -108,7 +114,8 @@ public class IndexRepositoryFactoryTest {
}
@Test
- public void
finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary()
{
+ public void
finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary()
+ throws IOException {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true).thenReturn(false);
when(distributedLockService.lock(any(), anyLong(),
anyLong())).thenReturn(false);
@@ -119,7 +126,7 @@ public class IndexRepositoryFactoryTest {
}
@Test
- public void
finishComputingRepositoryShouldReturnNullAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
+ public void
finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
throws IOException {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
@@ -127,9 +134,8 @@ public class IndexRepositoryFactoryTest {
doThrow(new IOException("Test Exception")).when(indexRepositoryFactory)
.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex);
- IndexRepository indexRepository =
indexRepositoryFactory.finishComputingRepository(0,
- serializer, userRegion, oldRepository, luceneIndex);
- assertThat(indexRepository).isNull();
+ assertThatThrownBy(() ->
indexRepositoryFactory.finishComputingRepository(0,
+ serializer, userRegion, oldRepository,
luceneIndex)).isInstanceOf(IOException.class);
verify(distributedLockService).unlock(any());
}
@@ -146,4 +152,27 @@ public class IndexRepositoryFactoryTest {
userRegion, oldRepository,
luceneIndex)).isInstanceOf(CacheClosedException.class);
verify(distributedLockService).unlock(any());
}
+
+ @Test
+ public void
buildIndexWriterRetriesCreatingIndexWriterWhenIOExceptionEncountered()
+ throws IOException {
+ IndexWriter writer = mock(IndexWriter.class);
+ doThrow(new
IOException()).doReturn(writer).when(indexRepositoryFactory).getIndexWriter(any(),
+ any());
+ assertThat(indexRepositoryFactory.buildIndexWriter(bucketId,
fileAndChunkBucket, luceneIndex))
+ .isEqualTo(writer);
+ verify(indexRepositoryFactory, times(2)).getIndexWriter(any(), any());
+ }
+
+ @Test
+ public void
buildIndexWriterThrowsExceptionWhenIOExceptionConsistentlyEncountered()
+ throws IOException {
+ IOException testException = new IOException("Test exception");
+ doThrow(testException).when(indexRepositoryFactory).getIndexWriter(any(),
any());
+ assertThatThrownBy(
+ () -> indexRepositoryFactory.buildIndexWriter(bucketId,
fileAndChunkBucket, luceneIndex))
+ .isEqualTo(testException);
+ verify(indexRepositoryFactory,
times(GET_INDEX_WRITER_MAX_ATTEMPTS)).getIndexWriter(any(),
+ any());
+ }
}