This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-indexmanager-improvements in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit cf00704776b9596d20100f57c33a364f3e6413d0 Author: Robert Newson <[email protected]> AuthorDate: Wed Oct 4 10:12:16 2023 +0100 switch to AsyncLoadingCache --- .../apache/couchdb/nouveau/NouveauApplication.java | 5 +- .../apache/couchdb/nouveau/core/IndexLoader.java | 24 ----- .../apache/couchdb/nouveau/core/IndexManager.java | 103 ++++++++++++++++----- .../couchdb/nouveau/resources/IndexResource.java | 53 ++--------- .../nouveau/health/IndexHealthCheckTest.java | 5 +- .../couchdb/nouveau/lucene9/Lucene9IndexTest.java | 21 ++--- 6 files changed, 100 insertions(+), 111 deletions(-) diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java index 3039214b5..dadea5d8e 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java @@ -25,7 +25,6 @@ import org.apache.couchdb.nouveau.lucene9.ParallelSearcherFactory; import org.apache.couchdb.nouveau.resources.AnalyzeResource; import org.apache.couchdb.nouveau.resources.IndexResource; import org.apache.couchdb.nouveau.tasks.CloseAllIndexesTask; -import org.apache.lucene.search.SearcherFactory; public class NouveauApplication extends Application<NouveauApplicationConfiguration> { @@ -46,6 +45,7 @@ public class NouveauApplication extends Application<NouveauApplicationConfigurat indexManager.setIdleSeconds(configuration.getIdleSeconds()); indexManager.setMaxIndexesOpen(configuration.getMaxIndexesOpen()); indexManager.setMetricRegistry(environment.metrics()); + indexManager.setSearcherFactory(new ParallelSearcherFactory(ForkJoinPool.commonPool())); indexManager.setScheduler(environment .lifecycle() .scheduledExecutorService("index-manager-%d") @@ -63,8 +63,7 @@ public class NouveauApplication extends Application<NouveauApplicationConfigurat environment.jersey().register(analyzeResource); // IndexResource - final SearcherFactory searcherFactory = new ParallelSearcherFactory(ForkJoinPool.commonPool()); - final IndexResource indexResource = new IndexResource(indexManager, searcherFactory); + final IndexResource indexResource = new IndexResource(indexManager); environment.jersey().register(indexResource); // Health checks diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexLoader.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexLoader.java deleted file mode 100644 index b5def1642..000000000 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexLoader.java +++ /dev/null @@ -1,24 +0,0 @@ -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.apache.couchdb.nouveau.core; - -import java.io.IOException; -import java.nio.file.Path; -import org.apache.couchdb.nouveau.api.IndexDefinition; - -@FunctionalInterface -public interface IndexLoader { - - Index apply(final Path path, final IndexDefinition indexDefinition) throws IOException; -} diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java index cd4af687e..7df9eb03e 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java @@ -19,13 +19,13 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.caffeine.MetricsStatsCounter; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Scheduler; import com.github.benmanes.caffeine.cache.Weigher; - import io.dropwizard.lifecycle.Managed; import jakarta.ws.rs.WebApplicationException; import jakarta.ws.rs.core.Response.Status; @@ -36,13 +36,25 @@ import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.time.Duration; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.stream.Stream; import org.apache.couchdb.nouveau.api.IndexDefinition; +import org.apache.couchdb.nouveau.lucene9.Lucene9AnalyzerFactory; +import org.apache.couchdb.nouveau.lucene9.Lucene9Index; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.misc.store.DirectIODirectory; +import org.apache.lucene.search.SearcherFactory; +import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import org.checkerframework.checker.index.qual.NonNegative; -import org.eclipse.jetty.io.RuntimeIOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,34 +83,23 @@ public final class IndexManager implements Managed { private MetricRegistry metricRegistry; + private SearcherFactory searcherFactory; + private ScheduledExecutorService scheduler; - private Cache<String, Index> cache; + private AsyncLoadingCache<String, Index> cache; private StripedLock<String> lock; - public <R> R with(final String name, final IndexLoader loader, final IndexFunction<Index, R> indexFun) + public <R> R with(final String name, final IndexFunction<Index, R> indexFun) throws IOException, InterruptedException { while (true) { if (!exists(name)) { throw new WebApplicationException("Index does not exist", Status.NOT_FOUND); } - final Index index; - try { - index = cache.get(name, (n) -> { - LOGGER.info("opening {}", n); - final Path path = indexPath(n); - try { - final IndexDefinition indexDefinition = loadIndexDefinition(n); - return loader.apply(path, indexDefinition); - } catch (final IOException e) { - throw new RuntimeIOException(e); - } - }); - } catch (final RuntimeIOException e) { - throw (IOException) e.getCause(); - } + final CompletableFuture<Index> future = cache.get(name); + final Index index = future.join(); if (index.tryAcquire()) { try { @@ -196,7 +197,11 @@ public final class IndexManager implements Managed { } private void deleteIndex(final String name) throws IOException { - final Index index = cache.asMap().remove(name); + final CompletableFuture<Index> future = cache.asMap().remove(name); + if (future == null) { + return; + } + final Index index = future.getNow(null); if (index != null) { index.setDeleteOnClose(true); close(name, index); @@ -250,6 +255,10 @@ public final class IndexManager implements Managed { this.metricRegistry = metricRegistry; } + public void setSearcherFactory(final SearcherFactory searcherFactory) { + this.searcherFactory = searcherFactory; + } + @Override public void start() throws IOException { cache = Caffeine.newBuilder() @@ -260,7 +269,7 @@ public final class IndexManager implements Managed { .expireAfterAccess(Duration.ofSeconds(idleSeconds)) .scheduler(Scheduler.systemScheduler()) .evictionListener(new IndexEvictionListener()) - .build(); + .buildAsync(new AsyncIndexLoader()); lock = new StripedLock<String>(100); } @@ -318,7 +327,57 @@ public final class IndexManager implements Managed { // Pin active indexes return value.isActive() ? 0 : 1; } + } + + private class AsyncIndexLoader implements AsyncCacheLoader<String, Index> { + @Override + public CompletableFuture<? extends Index> asyncLoad(String name, Executor executor) throws Exception { + final CompletableFuture<Index> future = new CompletableFuture<Index>(); + + executor.execute(() -> { + LOGGER.info("opening {}", name); + final Path path = indexPath(name); + Index result; + try { + final IndexDefinition indexDefinition = loadIndexDefinition(name); + final Analyzer analyzer = Lucene9AnalyzerFactory.fromDefinition(indexDefinition); + final Directory dir = new DirectIODirectory(FSDirectory.open(path.resolve("9"))); + final IndexWriterConfig config = new IndexWriterConfig(analyzer); + config.setUseCompoundFile(false); + final IndexWriter writer = new IndexWriter(dir, config); + final long updateSeq = getSeq(writer, "update_seq"); + final long purgeSeq = getSeq(writer, "purge_seq"); + final SearcherManager searcherManager = new SearcherManager(writer, searcherFactory); + result = new Lucene9Index(analyzer, writer, updateSeq, purgeSeq, searcherManager); + future.complete(result); + } catch (IOException e) { + future.completeExceptionally(e); + } + }); + + return future; + } + + private long getSeq(final IndexWriter writer, final String key) throws IOException { + final Iterable<Map.Entry<String, String>> commitData = writer.getLiveCommitData(); + if (commitData == null) { + return 0L; + } + for (Map.Entry<String, String> entry : commitData) { + if (entry.getKey().equals(key)) { + return Long.parseLong(entry.getValue()); + } + } + return 0L; + } + } + + private void close(final String name, final CompletableFuture<Index> future) throws IOException { + final Index index = future.getNow(null); + if (index != null) { + close(name, index); + } } private void close(final String name, final Index index) throws IOException { diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index 9c7a100e3..a6ca2c47b 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -29,7 +29,6 @@ import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Objects; import org.apache.couchdb.nouveau.api.DocumentDeleteRequest; import org.apache.couchdb.nouveau.api.DocumentUpdateRequest; @@ -38,18 +37,7 @@ import org.apache.couchdb.nouveau.api.IndexInfo; import org.apache.couchdb.nouveau.api.IndexInfoRequest; import org.apache.couchdb.nouveau.api.SearchRequest; import org.apache.couchdb.nouveau.api.SearchResults; -import org.apache.couchdb.nouveau.core.IndexLoader; import org.apache.couchdb.nouveau.core.IndexManager; -import org.apache.couchdb.nouveau.lucene9.Lucene9AnalyzerFactory; -import org.apache.couchdb.nouveau.lucene9.Lucene9Index; -import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.misc.store.DirectIODirectory; -import org.apache.lucene.search.SearcherFactory; -import org.apache.lucene.search.SearcherManager; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; @Path("/index/{name}") @Metered @@ -60,11 +48,9 @@ import org.apache.lucene.store.FSDirectory; public final class IndexResource { private final IndexManager indexManager; - private final SearcherFactory searcherFactory; - public IndexResource(final IndexManager indexManager, final SearcherFactory searcherFactory) { + public IndexResource(final IndexManager indexManager) { this.indexManager = Objects.requireNonNull(indexManager); - this.searcherFactory = Objects.requireNonNull(searcherFactory); } @PUT @@ -80,7 +66,7 @@ public final class IndexResource { @PathParam("docId") String docId, @NotNull @Valid DocumentDeleteRequest request) throws Exception { - indexManager.with(name, indexLoader(), (index) -> { + indexManager.with(name, (index) -> { index.delete(docId, request); return null; }); @@ -93,7 +79,7 @@ public final class IndexResource { @GET public IndexInfo getIndexInfo(@PathParam("name") String name) throws Exception { - return indexManager.with(name, indexLoader(), (index) -> { + return indexManager.with(name, (index) -> { return index.info(); }); } @@ -101,7 +87,7 @@ public final class IndexResource { @POST public void setIndexInfo(@PathParam("name") String name, @NotNull @Valid IndexInfoRequest request) throws Exception { - indexManager.with(name, indexLoader(), (index) -> { + indexManager.with(name, (index) -> { if (request.getMatchUpdateSeq().isPresent() && request.getUpdateSeq().isPresent()) { index.setUpdateSeq( @@ -121,7 +107,7 @@ public final class IndexResource { @Path("/search") public SearchResults searchIndex(@PathParam("name") String name, @NotNull @Valid SearchRequest request) throws Exception { - return indexManager.with(name, indexLoader(), (index) -> { + return indexManager.with(name, (index) -> { return index.search(request); }); } @@ -133,36 +119,9 @@ public final class IndexResource { @PathParam("docId") String docId, @NotNull @Valid DocumentUpdateRequest request) throws Exception { - indexManager.with(name, indexLoader(), (index) -> { + indexManager.with(name, (index) -> { index.update(docId, request); return null; }); } - - private IndexLoader indexLoader() { - return (path, indexDefinition) -> { - final Analyzer analyzer = Lucene9AnalyzerFactory.fromDefinition(indexDefinition); - final Directory dir = new DirectIODirectory(FSDirectory.open(path.resolve("9"))); - final IndexWriterConfig config = new IndexWriterConfig(analyzer); - config.setUseCompoundFile(false); - final IndexWriter writer = new IndexWriter(dir, config); - final long updateSeq = getSeq(writer, "update_seq"); - final long purgeSeq = getSeq(writer, "purge_seq"); - final SearcherManager searcherManager = new SearcherManager(writer, searcherFactory); - return new Lucene9Index(analyzer, writer, updateSeq, purgeSeq, searcherManager); - }; - } - - private static long getSeq(final IndexWriter writer, final String key) throws IOException { - final Iterable<Map.Entry<String, String>> commitData = writer.getLiveCommitData(); - if (commitData == null) { - return 0L; - } - for (Map.Entry<String, String> entry : commitData) { - if (entry.getKey().equals(key)) { - return Long.parseLong(entry.getValue()); - } - } - return 0L; - } } diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java index c71c281fa..38a34d548 100644 --- a/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java +++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java @@ -34,12 +34,15 @@ public class IndexHealthCheckTest { manager.setObjectMapper(new ObjectMapper()); manager.setMetricRegistry(new MetricRegistry()); manager.setRootDir(tempDir); + manager.setSearcherFactory(new SearcherFactory()); manager.setScheduler(scheduler); manager.start(); + try { - var resource = new IndexResource(manager, new SearcherFactory()); + var resource = new IndexResource(manager); var check = new IndexHealthCheck(resource); assertTrue(check.check().isHealthy()); + } finally { scheduler.shutdown(); manager.stop(); diff --git a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java index 98d752fa8..ece5fb36f 100644 --- a/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java +++ b/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene9/Lucene9IndexTest.java @@ -33,7 +33,6 @@ import org.apache.couchdb.nouveau.api.SearchRequest; import org.apache.couchdb.nouveau.api.SearchResults; import org.apache.couchdb.nouveau.api.StringField; import org.apache.couchdb.nouveau.core.Index; -import org.apache.couchdb.nouveau.core.IndexLoader; import org.apache.couchdb.nouveau.core.UpdatesOutOfOrderException; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.index.IndexWriter; @@ -50,7 +49,13 @@ public class Lucene9IndexTest { protected final Index setup(final Path path) throws IOException { final IndexDefinition indexDefinition = new IndexDefinition(); indexDefinition.setDefaultAnalyzer("standard"); - return indexLoader().apply(path, indexDefinition); + final Analyzer analyzer = Lucene9AnalyzerFactory.fromDefinition(indexDefinition); + final Directory dir = new DirectIODirectory(FSDirectory.open(path)); + final IndexWriterConfig config = new IndexWriterConfig(analyzer); + config.setUseCompoundFile(false); + final IndexWriter writer = new IndexWriter(dir, config); + final SearcherManager searcherManager = new SearcherManager(writer, null); + return new Lucene9Index(analyzer, writer, 0L, 0L, searcherManager); } protected final void cleanup(final Index index) throws IOException { @@ -235,16 +240,4 @@ public class Lucene9IndexTest { cleanup(index); } } - - protected IndexLoader indexLoader() { - return (path, indexDefinition) -> { - final Analyzer analyzer = Lucene9AnalyzerFactory.fromDefinition(indexDefinition); - final Directory dir = new DirectIODirectory(FSDirectory.open(path)); - final IndexWriterConfig config = new IndexWriterConfig(analyzer); - config.setUseCompoundFile(false); - final IndexWriter writer = new IndexWriter(dir, config); - final SearcherManager searcherManager = new SearcherManager(writer, null); - return new Lucene9Index(analyzer, writer, 0L, 0L, searcherManager); - }; - } }
