This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-indexmanager-improvements in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f26833ae303ee1dc45b6a0b68a45aa5eb565bd99 Author: Robert Newson <[email protected]> AuthorDate: Wed Oct 4 11:26:41 2023 +0100 move commit scheduling to refreshAfterWrite --- .../apache/couchdb/nouveau/NouveauApplication.java | 5 --- .../org/apache/couchdb/nouveau/core/Index.java | 22 +--------- .../apache/couchdb/nouveau/core/IndexManager.java | 49 ++++++++++------------ .../nouveau/health/IndexHealthCheckTest.java | 6 +-- 4 files changed, 25 insertions(+), 57 deletions(-) diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java index dadea5d8e..a7cfdeed6 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java @@ -46,11 +46,6 @@ public class NouveauApplication extends Application<NouveauApplicationConfigurat indexManager.setMaxIndexesOpen(configuration.getMaxIndexesOpen()); indexManager.setMetricRegistry(environment.metrics()); indexManager.setSearcherFactory(new ParallelSearcherFactory(ForkJoinPool.commonPool())); - indexManager.setScheduler(environment - .lifecycle() - .scheduledExecutorService("index-manager-%d") - .threads(5) - .build()); indexManager.setObjectMapper(environment.getObjectMapper()); indexManager.setRootDir(configuration.getRootDir()); environment.lifecycle().manage(indexManager); diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java index 848a29c51..2fba28340 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java @@ -18,7 +18,6 @@ import jakarta.ws.rs.core.Response.Status; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import org.apache.couchdb.nouveau.api.DocumentDeleteRequest; import org.apache.couchdb.nouveau.api.DocumentUpdateRequest; import org.apache.couchdb.nouveau.api.IndexInfo; @@ -40,7 +39,6 @@ public abstract class Index implements Closeable { private long updateSeq; private long purgeSeq; private boolean deleteOnClose = false; - private long lastCommit = now(); private final Semaphore permits = new Semaphore(Integer.MAX_VALUE); protected Index(final long updateSeq, final long purgeSeq) { @@ -104,14 +102,7 @@ public abstract class Index implements Closeable { updateSeq = this.updateSeq; purgeSeq = this.purgeSeq; } - final boolean result = doCommit(updateSeq, purgeSeq); - if (result) { - final long now = now(); - synchronized (this) { - this.lastCommit = now; - } - } - return result; + return doCommit(updateSeq, purgeSeq); } protected abstract boolean doCommit(final long updateSeq, final long purgeSeq) throws IOException; @@ -195,15 +186,4 @@ public abstract class Index implements Closeable { assertPurgeSeqProgress(matchSeq, purgeSeq); this.purgeSeq = purgeSeq; } - - public boolean needsCommit(final long duration, final TimeUnit unit) { - final long commitNeededSince = now() - unit.toNanos(duration); - synchronized (this) { - return this.lastCommit < commitNeededSince; - } - } - - private long now() { - return System.nanoTime(); - } } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java index 7df9eb03e..d27c709c0 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java @@ -39,8 +39,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.Stream; import org.apache.couchdb.nouveau.api.IndexDefinition; @@ -85,8 +83,6 @@ public final class IndexManager implements Managed { private SearcherFactory searcherFactory; - private ScheduledExecutorService scheduler; - private AsyncLoadingCache<String, Index> cache; private StripedLock<String> lock; @@ -103,24 +99,7 @@ public final class IndexManager implements Managed { if (index.tryAcquire()) { try { - final R result = indexFun.apply(index); - if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) { - scheduler.execute(() -> { - if (index.tryAcquire()) { - try { - LOGGER.debug("committing {}", name); - try { - index.commit(); - } catch (final IOException e) { - LOGGER.warn("I/O exception while committing " + name, e); - } - } finally { - index.release(); - } - } - }); - } - return result; + return indexFun.apply(index); } finally { index.release(); } @@ -235,10 +214,6 @@ public final class IndexManager implements Managed { this.idleSeconds = idleSeconds; } - public void setScheduler(ScheduledExecutorService scheduler) { - this.scheduler = scheduler; - } - public Path getRootDir() { return rootDir; } @@ -267,6 +242,7 @@ public final class IndexManager implements Managed { .maximumWeight(maxIndexesOpen) .weigher(new IndexWeigher()) .expireAfterAccess(Duration.ofSeconds(idleSeconds)) + .refreshAfterWrite(Duration.ofSeconds(commitIntervalSeconds)) .scheduler(Scheduler.systemScheduler()) .evictionListener(new IndexEvictionListener()) .buildAsync(new AsyncIndexLoader()); @@ -359,6 +335,27 @@ public final class IndexManager implements Managed { return future; } + @Override + public CompletableFuture<? extends Index> asyncReload(String name, Index index, Executor executor) + throws Exception { + executor.execute(() -> { + if (index.tryAcquire()) { + try { + try { + if (index.commit()) { + LOGGER.info("committed {}", name); + } + } catch (final IOException e) { + LOGGER.warn("I/O exception while committing " + name, e); + } + } finally { + index.release(); + } + } + }); + return CompletableFuture.completedFuture(index); + } + private long getSeq(final IndexWriter writer, final String key) throws IOException { final Iterable<Map.Entry<String, String>> commitData = writer.getLiveCommitData(); if (commitData == null) { diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java index 38a34d548..4bae7a7ad 100644 --- a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java +++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java @@ -18,7 +18,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.file.Path; -import java.util.concurrent.Executors; import org.apache.couchdb.nouveau.core.IndexManager; import org.apache.couchdb.nouveau.resources.IndexResource; import org.apache.lucene.search.SearcherFactory; @@ -29,22 +28,19 @@ public class IndexHealthCheckTest { @Test public void testIndexHealthCheck(@TempDir final Path tempDir) throws Exception { - var scheduler = Executors.newSingleThreadScheduledExecutor(); var manager = new IndexManager(); + manager.setCommitIntervalSeconds(1); manager.setObjectMapper(new ObjectMapper()); manager.setMetricRegistry(new MetricRegistry()); manager.setRootDir(tempDir); manager.setSearcherFactory(new SearcherFactory()); - manager.setScheduler(scheduler); manager.start(); try { var resource = new IndexResource(manager); var check = new IndexHealthCheck(resource); assertTrue(check.check().isHealthy()); - } finally { - scheduler.shutdown(); manager.stop(); } }
