This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch nouveau-indexmanager-improvements
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f26833ae303ee1dc45b6a0b68a45aa5eb565bd99
Author: Robert Newson <[email protected]>
AuthorDate: Wed Oct 4 11:26:41 2023 +0100

    move commit scheduling to refreshAfterWrite
---
 .../apache/couchdb/nouveau/NouveauApplication.java |  5 ---
 .../org/apache/couchdb/nouveau/core/Index.java     | 22 +---------
 .../apache/couchdb/nouveau/core/IndexManager.java  | 49 ++++++++++------------
 .../nouveau/health/IndexHealthCheckTest.java       |  6 +--
 4 files changed, 25 insertions(+), 57 deletions(-)

diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
index dadea5d8e..a7cfdeed6 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java
@@ -46,11 +46,6 @@ public class NouveauApplication extends 
Application<NouveauApplicationConfigurat
         indexManager.setMaxIndexesOpen(configuration.getMaxIndexesOpen());
         indexManager.setMetricRegistry(environment.metrics());
         indexManager.setSearcherFactory(new 
ParallelSearcherFactory(ForkJoinPool.commonPool()));
-        indexManager.setScheduler(environment
-                .lifecycle()
-                .scheduledExecutorService("index-manager-%d")
-                .threads(5)
-                .build());
         indexManager.setObjectMapper(environment.getObjectMapper());
         indexManager.setRootDir(configuration.getRootDir());
         environment.lifecycle().manage(indexManager);
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index 848a29c51..2fba28340 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -18,7 +18,6 @@ import jakarta.ws.rs.core.Response.Status;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
 import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
 import org.apache.couchdb.nouveau.api.IndexInfo;
@@ -40,7 +39,6 @@ public abstract class Index implements Closeable {
     private long updateSeq;
     private long purgeSeq;
     private boolean deleteOnClose = false;
-    private long lastCommit = now();
     private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
 
     protected Index(final long updateSeq, final long purgeSeq) {
@@ -104,14 +102,7 @@ public abstract class Index implements Closeable {
             updateSeq = this.updateSeq;
             purgeSeq = this.purgeSeq;
         }
-        final boolean result = doCommit(updateSeq, purgeSeq);
-        if (result) {
-            final long now = now();
-            synchronized (this) {
-                this.lastCommit = now;
-            }
-        }
-        return result;
+        return doCommit(updateSeq, purgeSeq);
     }
 
     protected abstract boolean doCommit(final long updateSeq, final long 
purgeSeq) throws IOException;
@@ -195,15 +186,4 @@ public abstract class Index implements Closeable {
         assertPurgeSeqProgress(matchSeq, purgeSeq);
         this.purgeSeq = purgeSeq;
     }
-
-    public boolean needsCommit(final long duration, final TimeUnit unit) {
-        final long commitNeededSince = now() - unit.toNanos(duration);
-        synchronized (this) {
-            return this.lastCommit < commitNeededSince;
-        }
-    }
-
-    private long now() {
-        return System.nanoTime();
-    }
 }
diff --git 
a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java 
b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
index 7df9eb03e..d27c709c0 100644
--- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
@@ -39,8 +39,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.stream.Stream;
 import org.apache.couchdb.nouveau.api.IndexDefinition;
@@ -85,8 +83,6 @@ public final class IndexManager implements Managed {
 
     private SearcherFactory searcherFactory;
 
-    private ScheduledExecutorService scheduler;
-
     private AsyncLoadingCache<String, Index> cache;
 
     private StripedLock<String> lock;
@@ -103,24 +99,7 @@ public final class IndexManager implements Managed {
 
             if (index.tryAcquire()) {
                 try {
-                    final R result = indexFun.apply(index);
-                    if (index.needsCommit(commitIntervalSeconds, 
TimeUnit.SECONDS)) {
-                        scheduler.execute(() -> {
-                            if (index.tryAcquire()) {
-                                try {
-                                    LOGGER.debug("committing {}", name);
-                                    try {
-                                        index.commit();
-                                    } catch (final IOException e) {
-                                        LOGGER.warn("I/O exception while 
committing " + name, e);
-                                    }
-                                } finally {
-                                    index.release();
-                                }
-                            }
-                        });
-                    }
-                    return result;
+                    return indexFun.apply(index);
                 } finally {
                     index.release();
                 }
@@ -235,10 +214,6 @@ public final class IndexManager implements Managed {
         this.idleSeconds = idleSeconds;
     }
 
-    public void setScheduler(ScheduledExecutorService scheduler) {
-        this.scheduler = scheduler;
-    }
-
     public Path getRootDir() {
         return rootDir;
     }
@@ -267,6 +242,7 @@ public final class IndexManager implements Managed {
                 .maximumWeight(maxIndexesOpen)
                 .weigher(new IndexWeigher())
                 .expireAfterAccess(Duration.ofSeconds(idleSeconds))
+                .refreshAfterWrite(Duration.ofSeconds(commitIntervalSeconds))
                 .scheduler(Scheduler.systemScheduler())
                 .evictionListener(new IndexEvictionListener())
                 .buildAsync(new AsyncIndexLoader());
@@ -359,6 +335,27 @@ public final class IndexManager implements Managed {
             return future;
         }
 
+        @Override
+        public CompletableFuture<? extends Index> asyncReload(String name, 
Index index, Executor executor)
+                throws Exception {
+            executor.execute(() -> {
+                if (index.tryAcquire()) {
+                    try {
+                        try {
+                            if (index.commit()) {
+                                LOGGER.info("committed {}", name);
+                            }
+                        } catch (final IOException e) {
+                            LOGGER.warn("I/O exception while committing " + 
name, e);
+                        }
+                    } finally {
+                        index.release();
+                    }
+                }
+            });
+            return CompletableFuture.completedFuture(index);
+        }
+
         private long getSeq(final IndexWriter writer, final String key) throws 
IOException {
             final Iterable<Map.Entry<String, String>> commitData = 
writer.getLiveCommitData();
             if (commitData == null) {
diff --git 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
index 38a34d548..4bae7a7ad 100644
--- 
a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
+++ 
b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java
@@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.nio.file.Path;
-import java.util.concurrent.Executors;
 import org.apache.couchdb.nouveau.core.IndexManager;
 import org.apache.couchdb.nouveau.resources.IndexResource;
 import org.apache.lucene.search.SearcherFactory;
@@ -29,22 +28,19 @@ public class IndexHealthCheckTest {
 
     @Test
     public void testIndexHealthCheck(@TempDir final Path tempDir) throws 
Exception {
-        var scheduler = Executors.newSingleThreadScheduledExecutor();
         var manager = new IndexManager();
+        manager.setCommitIntervalSeconds(1);
         manager.setObjectMapper(new ObjectMapper());
         manager.setMetricRegistry(new MetricRegistry());
         manager.setRootDir(tempDir);
         manager.setSearcherFactory(new SearcherFactory());
-        manager.setScheduler(scheduler);
         manager.start();
 
         try {
             var resource = new IndexResource(manager);
             var check = new IndexHealthCheck(resource);
             assertTrue(check.check().isHealthy());
-
         } finally {
-            scheduler.shutdown();
             manager.stop();
         }
     }

Reply via email to