This is an automated email from the ASF dual-hosted git repository.

donalevans pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new eccd4f0  GEODE-8536: Allow limited retries when creating Lucene 
IndexWriter (#5553)
eccd4f0 is described below

commit eccd4f0d867579990a6a5de3dc070d45ef8bb8fb
Author: Donal Evans <[email protected]>
AuthorDate: Fri Oct 2 18:43:18 2020 -0700

    GEODE-8536: Allow limited retries when creating Lucene IndexWriter (#5553)
    
    Authored-by: Donal Evans <[email protected]>
---
 .../IndexRepositoryFactoryDistributedTest.java     |   2 -
 .../IndexRepositoryFactoryIntegrationTest.java     | 107 +++++++++++++++++++++
 .../lucene/internal/IndexRepositoryFactory.java    |  41 ++++++--
 .../internal/IndexRepositoryFactoryTest.java       |  45 +++++++--
 4 files changed, 177 insertions(+), 18 deletions(-)

diff --git 
a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
 
b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
index a3f245d..bfe921f 100644
--- 
a/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
+++ 
b/geode-lucene/src/distributedTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryDistributedTest.java
@@ -25,7 +25,6 @@ import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
-import junitparams.Parameters;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.Before;
@@ -111,7 +110,6 @@ public class IndexRepositoryFactoryDistributedTest 
implements Serializable {
   }
 
   @Test
-  @Parameters()
   public void lockedBucketShouldPreventPrimaryFromMoving() {
     dataStore1.invoke(this::initDataStoreAndLuceneIndex);
     dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
diff --git 
a/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
 
b/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
new file mode 100644
index 0000000..0506b75
--- /dev/null
+++ 
b/geode-lucene/src/integrationTest/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryIntegrationTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.lucene.LuceneQuery;
+import org.apache.geode.cache.lucene.LuceneQueryException;
+import org.apache.geode.cache.lucene.LuceneServiceProvider;
+import org.apache.geode.cache.lucene.test.TestObject;
+import org.apache.geode.internal.cache.PartitionedRegion;
+
+public class IndexRepositoryFactoryIntegrationTest {
+  public Cache cache;
+  public static final String INDEX_NAME = "testIndex";
+  public static final String REGION_NAME = "testRegion";
+  private IndexRepositoryFactory spyFactory;
+  private LuceneQuery<Object, Object> luceneQuery;
+
+  @Before
+  public void setUp() {
+    cache = new CacheFactory().create();
+    LuceneServiceProvider.get(cache).createIndexFactory().setFields("field1", 
"field2")
+        .create(INDEX_NAME, REGION_NAME);
+
+    Region<Object, Object> dataRegion =
+        
cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);
+
+    dataRegion.put("A", new TestObject());
+
+    spyFactory = spy(new IndexRepositoryFactory());
+    PartitionedRepositoryManager.indexRepositoryFactory = spyFactory;
+
+    luceneQuery = LuceneServiceProvider.get(cache).createLuceneQueryFactory()
+        .create(INDEX_NAME, REGION_NAME, "hello", "field1");
+  }
+
+  @After
+  public void tearDown() {
+    if (cache != null) {
+      cache.close();
+    }
+  }
+
+  @Test
+  public void 
shouldRetryWhenIOExceptionEncounteredOnceDuringComputingRepository()
+      throws IOException, LuceneQueryException {
+    // To ensure that the specific bucket used in the query throws the 
IOException to trigger the
+    // retry, throw once for every bucket in the region
+    int timesToThrow = ((PartitionedRegion) 
cache.getRegion(REGION_NAME)).getTotalNumberOfBuckets();
+
+    doAnswer(new Answer<Object>() {
+      private int times = 0;
+
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (times < timesToThrow) {
+          times++;
+          throw new IOException();
+        }
+        return invocation.callRealMethod();
+      }
+    }).when(spyFactory).getIndexWriter(any(), any());
+
+    luceneQuery.findKeys();
+  }
+
+  @Test
+  public void 
shouldThrowInternalGemfireErrorWhenIOExceptionEncounteredConsistentlyDuringComputingRepository()
+      throws IOException {
+    doThrow(new IOException()).when(spyFactory).getIndexWriter(any(), any());
+
+    
assertThatThrownBy(luceneQuery::findKeys).isInstanceOf(FunctionException.class)
+        .hasCauseInstanceOf(InternalGemFireError.class);
+  }
+}
diff --git 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index 7674a45..7db8b96 100644
--- 
a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ 
b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -44,6 +44,7 @@ public class IndexRepositoryFactory {
   private static final Logger logger = LogService.getLogger();
   public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = 
"FileRegionLockForBucketId:";
   public static final String APACHE_GEODE_INDEX_COMPLETE = 
"APACHE_GEODE_INDEX_COMPLETE";
+  protected static final int GET_INDEX_WRITER_MAX_ATTEMPTS = 200;
 
   public IndexRepositoryFactory() {}
 
@@ -74,7 +75,8 @@ public class IndexRepositoryFactory {
    * This is a util function just to not let computeIndexRepository be a huge 
chunk of code.
    */
   protected IndexRepository finishComputingRepository(Integer bucketId, 
LuceneSerializer serializer,
-      PartitionedRegion userRegion, IndexRepository oldRepository, 
InternalLuceneIndex index) {
+      PartitionedRegion userRegion, IndexRepository oldRepository, 
InternalLuceneIndex index)
+      throws IOException {
     LuceneIndexForPartitionedRegion indexForPR = 
(LuceneIndexForPartitionedRegion) index;
     final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
     BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
@@ -129,7 +131,7 @@ public class IndexRepositoryFactory {
     } catch (IOException e) {
       logger.warn("Exception thrown while constructing Lucene Index for 
bucket:" + bucketId
           + " for file region:" + fileAndChunkBucket.getFullPath(), e);
-      return null;
+      throw e;
     } catch (CacheClosedException e) {
       logger.info("CacheClosedException thrown while constructing Lucene Index 
for bucket:"
           + bucketId + " for file region:" + fileAndChunkBucket.getFullPath());
@@ -144,11 +146,34 @@ public class IndexRepositoryFactory {
 
   protected IndexWriter buildIndexWriter(int bucketId, BucketRegion 
fileAndChunkBucket,
       LuceneIndexForPartitionedRegion indexForPR) throws IOException {
-    // bucketTargetingMap handles partition resolver (via bucketId as 
callbackArg)
-    Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, 
bucketId);
-    RegionDirectory dir = new RegionDirectory(bucketTargetingMap, 
indexForPR.getFileSystemStats());
-    IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
+    int attempts = 0;
+    // IOExceptions can occur if the fileAndChunk region is being modified 
while the IndexWriter is
+    // being initialized, so allow limited retries here to account for that 
timing window
+    while (true) {
+      // bucketTargetingMap handles partition resolver (via bucketId as 
callbackArg)
+      Map<Object, Object> bucketTargetingMap = 
getBucketTargetingMap(fileAndChunkBucket, bucketId);
+      RegionDirectory dir =
+          new RegionDirectory(bucketTargetingMap, 
indexForPR.getFileSystemStats());
+      IndexWriterConfig config = new 
IndexWriterConfig(indexForPR.getAnalyzer());
+      try {
+        attempts++;
+        return getIndexWriter(dir, config);
+      } catch (IOException e) {
+        if (attempts >= GET_INDEX_WRITER_MAX_ATTEMPTS) {
+          throw e;
+        }
+        logger.info("Encountered {} while attempting to get IndexWriter for 
index {}. Retrying...",
+            e, indexForPR.getName());
+        try {
+          Thread.sleep(5);
+        } catch (InterruptedException ignore) {
+        }
+      }
+    }
+  }
 
+  protected IndexWriter getIndexWriter(RegionDirectory dir, IndexWriterConfig 
config)
+      throws IOException {
     return new IndexWriter(dir, config);
   }
 
@@ -186,8 +211,8 @@ public class IndexRepositoryFactory {
     return value;
   }
 
-  protected Map getBucketTargetingMap(BucketRegion region, int bucketId) {
-    return new BucketTargetingMap(region, bucketId);
+  protected Map<Object, Object> getBucketTargetingMap(BucketRegion region, int 
bucketId) {
+    return new BucketTargetingMap<>(region, bucketId);
   }
 
   protected String getLockName(final BucketRegion fileAndChunkBucket) {
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
index e301dcf..38e6355 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactoryTest.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.cache.lucene.internal;
 
+import static 
org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.GET_INDEX_WRITER_MAX_ATTEMPTS;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
@@ -22,11 +23,13 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 
+import org.apache.lucene.index.IndexWriter;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -77,7 +80,8 @@ public class IndexRepositoryFactoryTest {
   }
 
   @Test
-  public void 
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull()
 {
+  public void 
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull()
+      throws IOException {
     doReturn(null).when(indexRepositoryFactory).getMatchingBucket(fileRegion, 
bucketId);
 
     IndexRepository indexRepository = 
indexRepositoryFactory.finishComputingRepository(0,
@@ -87,7 +91,8 @@ public class IndexRepositoryFactoryTest {
   }
 
   @Test
-  public void 
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary()
 {
+  public void 
finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary()
+      throws IOException {
     when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(false);
 
     IndexRepository indexRepository = 
indexRepositoryFactory.finishComputingRepository(0,
@@ -97,7 +102,8 @@ public class IndexRepositoryFactoryTest {
   }
 
   @Test
-  public void 
finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed() {
+  public void 
finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed()
+      throws IOException {
     when(oldRepository.isClosed()).thenReturn(false);
     when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
 
@@ -108,7 +114,8 @@ public class IndexRepositoryFactoryTest {
   }
 
   @Test
-  public void 
finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary()
 {
+  public void 
finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary()
+      throws IOException {
     when(oldRepository.isClosed()).thenReturn(true);
     
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true).thenReturn(false);
     when(distributedLockService.lock(any(), anyLong(), 
anyLong())).thenReturn(false);
@@ -119,7 +126,7 @@ public class IndexRepositoryFactoryTest {
   }
 
   @Test
-  public void 
finishComputingRepositoryShouldReturnNullAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
+  public void 
finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
       throws IOException {
     when(oldRepository.isClosed()).thenReturn(true);
     when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
@@ -127,9 +134,8 @@ public class IndexRepositoryFactoryTest {
     doThrow(new IOException("Test Exception")).when(indexRepositoryFactory)
         .buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex);
 
-    IndexRepository indexRepository = 
indexRepositoryFactory.finishComputingRepository(0,
-        serializer, userRegion, oldRepository, luceneIndex);
-    assertThat(indexRepository).isNull();
+    assertThatThrownBy(() -> 
indexRepositoryFactory.finishComputingRepository(0,
+        serializer, userRegion, oldRepository, 
luceneIndex)).isInstanceOf(IOException.class);
     verify(distributedLockService).unlock(any());
   }
 
@@ -146,4 +152,27 @@ public class IndexRepositoryFactoryTest {
         userRegion, oldRepository, 
luceneIndex)).isInstanceOf(CacheClosedException.class);
     verify(distributedLockService).unlock(any());
   }
+
+  @Test
+  public void 
buildIndexWriterRetriesCreatingIndexWriterWhenIOExceptionEncountered()
+      throws IOException {
+    IndexWriter writer = mock(IndexWriter.class);
+    doThrow(new 
IOException()).doReturn(writer).when(indexRepositoryFactory).getIndexWriter(any(),
+        any());
+    assertThat(indexRepositoryFactory.buildIndexWriter(bucketId, 
fileAndChunkBucket, luceneIndex))
+        .isEqualTo(writer);
+    verify(indexRepositoryFactory, times(2)).getIndexWriter(any(), any());
+  }
+
+  @Test
+  public void 
buildIndexWriterThrowsExceptionWhenIOExceptionConsistentlyEncountered()
+      throws IOException {
+    IOException testException = new IOException("Test exception");
+    doThrow(testException).when(indexRepositoryFactory).getIndexWriter(any(), 
any());
+    assertThatThrownBy(
+        () -> indexRepositoryFactory.buildIndexWriter(bucketId, 
fileAndChunkBucket, luceneIndex))
+            .isEqualTo(testException);
+    verify(indexRepositoryFactory, 
times(GET_INDEX_WRITER_MAX_ATTEMPTS)).getIndexWriter(any(),
+        any());
+  }
 }

Reply via email to