This is an automated email from the ASF dual-hosted git repository. yashmayya pushed a commit to branch release-1.5.0-rc in repository https://gitbox.apache.org/repos/asf/pinot.git
commit fe79b9ad43e27b3744712eea8da96487a022723f Author: Anurag Rai <[email protected]> AuthorDate: Thu Apr 2 05:40:30 2026 +0530 Fix Broken Lucene Query Tracking and Cancellation for OOM Protection (#17884) --- .../MultiColumnRealtimeLuceneTextIndex.java | 155 +++++----- .../RealtimeLuceneDocIdCollector.java | 21 ++ .../invertedindex/RealtimeLuceneTextIndex.java | 140 ++++----- .../RealtimeLuceneTextIndexSearcherPool.java | 9 +- .../realtime/impl/vector/MutableVectorIndex.java | 34 ++- .../index/readers/text/LuceneDocIdCollector.java | 19 ++ .../index/readers/vector/HnswDocIdCollector.java | 10 + .../invertedindex/LuceneMutableTextIndexTest.java | 24 +- ...ealtimeLuceneTextIndexResourceTrackingTest.java | 323 +++++++++++++++++++++ .../apache/pinot/spi/query/QueryThreadContext.java | 19 +- 10 files changed, 601 insertions(+), 153 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java index 783d7daa3d5..884227d31ec 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java @@ -24,7 +24,6 @@ import java.io.File; import java.lang.reflect.Constructor; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import javax.annotation.Nullable; import org.apache.lucene.analysis.Analyzer; @@ -174,30 +173,12 @@ public class MultiColumnRealtimeLuceneTextIndex implements MultiColumnTextIndexR LuceneTextIndexUtils.LuceneTextIndexOptions options) { MutableRoaringBitmap docIDs = new MutableRoaringBitmap(); RealtimeLuceneDocIdCollector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); - // A thread interrupt during indexSearcher.search() can break the underlying FSDirectory used by the IndexWriter - // which the SearcherManager is created with. To ensure the index is never corrupted the search is executed - // in a child thread and the interrupt is handled in the current thread by canceling the search gracefully. + // Search is executed in SEARCHER_POOL which is wrapped with contextAwareExecutorService(executor, false). + // This propagates QueryThreadContext for CPU/memory tracking without registering the task for cancellation, + // preventing Thread.interrupt() during Lucene search which could corrupt FSDirectory. // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 - Callable<MutableRoaringBitmap> searchCallable = () -> { - IndexSearcher indexSearcher = null; - try { - Query query = LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, column, _analyzer); - indexSearcher = _searcherManager.acquire(); - indexSearcher.search(query, docIDCollector); - return getPinotDocIds(indexSearcher, docIDs); - } finally { - try { - if (indexSearcher != null) { - _searcherManager.release(indexSearcher); - } - } catch (Exception e) { - LOGGER.error( - "Failed while releasing the searcher manager for realtime text index for columns {}, exception {}", - _columns, e.getMessage()); - } - } - }; - Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit(searchCallable); + Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit( + () -> executeSearchWithOptions(column, actualQuery, options, docIDCollector)); try { return searchFuture.get(); } catch (InterruptedException e) { @@ -210,64 +191,37 @@ public class MultiColumnRealtimeLuceneTextIndex implements MultiColumnTextIndexR } } - private MutableRoaringBitmap getDocIdsWithoutOptions(String column, String searchQuery) { - MutableRoaringBitmap docIDs = new MutableRoaringBitmap(); - RealtimeLuceneDocIdCollector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); - // A thread interrupt during indexSearcher.search() can break the underlying FSDirectory used by the IndexWriter - // which the SearcherManager is created with. To ensure the index is never corrupted the search is executed - // in a child thread and the interrupt is handled in the current thread by canceling the search gracefully. - // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 - Callable<MutableRoaringBitmap> searchCallable = () -> { - IndexSearcher indexSearcher = null; - try { - - // Lucene query parsers are generally stateful and a new instance must be created per query. - Constructor<QueryParserBase> queryParserClassConstructor = _queryParserClassConstructor; - boolean enablePrefixSuffixMatchingInPhraseQueries = _enablePrefixSuffixMatchingInPhraseQueries; - MultiColumnLuceneTextIndexReader.ColumnConfig columnConfig = _perColumnConfigs.get(column); - if (columnConfig != null) { - if (columnConfig.getQueryParserClassConstructor() != null) { - queryParserClassConstructor = columnConfig.getQueryParserClassConstructor(); - } - if (columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries() != null) { - enablePrefixSuffixMatchingInPhraseQueries = columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries(); - } - } - - QueryParserBase parser = queryParserClassConstructor.newInstance(column, _analyzer); - if (enablePrefixSuffixMatchingInPhraseQueries) { - // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use - // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. - // Additionally, it does not support wildcards that span across terms. - // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 - // Workaround: Use a custom query parser that correctly implements wildcard searches. - parser.setAllowLeadingWildcard(true); - } - Query query = parser.parse(searchQuery); - if (enablePrefixSuffixMatchingInPhraseQueries) { - // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use - // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. - // Additionally, it does not support wildcards that span across terms. - // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 - // Workaround: Use a custom query parser that correctly implements wildcard searches. - query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query); - } - indexSearcher = _searcherManager.acquire(); - indexSearcher.search(query, docIDCollector); - return getPinotDocIds(indexSearcher, docIDs); - } finally { + private MutableRoaringBitmap executeSearchWithOptions(String column, String actualQuery, + LuceneTextIndexUtils.LuceneTextIndexOptions options, RealtimeLuceneDocIdCollector docIDCollector) + throws Exception { + IndexSearcher indexSearcher = null; + try { + Query query = LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, column, _analyzer); + indexSearcher = _searcherManager.acquire(); + indexSearcher.search(query, docIDCollector); + return getPinotDocIds(indexSearcher, docIDCollector.getDocIds()); + } finally { + if (indexSearcher != null) { try { - if (indexSearcher != null) { - _searcherManager.release(indexSearcher); - } + _searcherManager.release(indexSearcher); } catch (Exception e) { LOGGER.error( "Failed while releasing the searcher manager for realtime text index for columns {}, exception {}", _columns, e.getMessage()); } } - }; - Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit(searchCallable); + } + } + + private MutableRoaringBitmap getDocIdsWithoutOptions(String column, String searchQuery) { + MutableRoaringBitmap docIDs = new MutableRoaringBitmap(); + RealtimeLuceneDocIdCollector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); + // Search is executed in SEARCHER_POOL which is wrapped with contextAwareExecutorService(executor, false). + // This propagates QueryThreadContext for CPU/memory tracking without registering the task for cancellation, + // preventing Thread.interrupt() during Lucene search which could corrupt FSDirectory. + // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 + Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit( + () -> executeSearchWithoutOptions(column, searchQuery, docIDCollector)); try { return searchFuture.get(); } catch (InterruptedException e) { @@ -280,6 +234,57 @@ public class MultiColumnRealtimeLuceneTextIndex implements MultiColumnTextIndexR } } + private MutableRoaringBitmap executeSearchWithoutOptions(String column, String searchQuery, + RealtimeLuceneDocIdCollector docIDCollector) throws Exception { + IndexSearcher indexSearcher = null; + try { + // Lucene query parsers are generally stateful and a new instance must be created per query. + Constructor<QueryParserBase> queryParserClassConstructor = _queryParserClassConstructor; + boolean enablePrefixSuffixMatchingInPhraseQueries = _enablePrefixSuffixMatchingInPhraseQueries; + MultiColumnLuceneTextIndexReader.ColumnConfig columnConfig = _perColumnConfigs.get(column); + if (columnConfig != null) { + if (columnConfig.getQueryParserClassConstructor() != null) { + queryParserClassConstructor = columnConfig.getQueryParserClassConstructor(); + } + if (columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries() != null) { + enablePrefixSuffixMatchingInPhraseQueries = columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries(); + } + } + + QueryParserBase parser = queryParserClassConstructor.newInstance(column, _analyzer); + if (enablePrefixSuffixMatchingInPhraseQueries) { + // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use + // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. + // Additionally, it does not support wildcards that span across terms. + // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 + // Workaround: Use a custom query parser that correctly implements wildcard searches. + parser.setAllowLeadingWildcard(true); + } + Query query = parser.parse(searchQuery); + if (enablePrefixSuffixMatchingInPhraseQueries) { + // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use + // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. + // Additionally, it does not support wildcards that span across terms. + // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 + // Workaround: Use a custom query parser that correctly implements wildcard searches. + query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query); + } + indexSearcher = _searcherManager.acquire(); + indexSearcher.search(query, docIDCollector); + return getPinotDocIds(indexSearcher, docIDCollector.getDocIds()); + } finally { + if (indexSearcher != null) { + try { + _searcherManager.release(indexSearcher); + } catch (Exception e) { + LOGGER.error( + "Failed while releasing the searcher manager for realtime text index for columns {}, exception {}", + _columns, e.getMessage()); + } + } + } + } + // TODO: Optimize this similar to how we have done for offline/completed segments. // Pre-built mapping will not work for realtime. We need to build an on-the-fly cache // as queries are coming in. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java index 2b46061e9eb..4c64c703ee8 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java @@ -20,10 +20,12 @@ package org.apache.pinot.segment.local.realtime.impl.invertedindex; import java.io.IOException; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.Collector; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; +import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -51,6 +53,8 @@ public class RealtimeLuceneDocIdCollector implements Collector { @Override public LeafCollector getLeafCollector(LeafReaderContext context) { return new LeafCollector() { + // Counter for periodic termination check + private int _numDocsCollected = 0; @Override public void setScorer(Scorable scorer) @@ -64,6 +68,19 @@ public class RealtimeLuceneDocIdCollector implements Collector { if (_shouldCancel) { throw new RuntimeException("TEXT_MATCH query was cancelled"); } + try { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically( + _numDocsCollected++, "RealtimeLuceneDocIdCollector"); + } catch (RuntimeException e) { + // CollectionTerminatedException - Lucene's IndexSearcher.search() specially handles this exception + // to gracefully stop document collection without treating it as an error. + // When checkTerminationAndSampleUsagePeriodically() throws + // TerminationException (for OOM/timeout), it's already stored in QueryExecutionContext._terminateException + // before being thrown. After search completes, higher-level code retrieves the actual error via + // QueryThreadContext.getTerminateException() to include proper error details in query response. + throw new CollectionTerminatedException(); + } + // Compute the absolute lucene docID across sub-indexes as doc that is passed is relative to the current reader _docIds.add(context.docBase + doc); } @@ -73,4 +90,8 @@ public class RealtimeLuceneDocIdCollector implements Collector { public void markShouldCancel() { _shouldCancel = true; } + + public MutableRoaringBitmap getDocIds() { + return _docIds; + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java index aa09fdd2d50..47b7ef75f18 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java @@ -21,7 +21,6 @@ package org.apache.pinot.segment.local.realtime.impl.invertedindex; import java.io.File; import java.lang.reflect.Constructor; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import javax.annotation.Nullable; import org.apache.lucene.analysis.Analyzer; @@ -157,30 +156,12 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { LuceneTextIndexUtils.LuceneTextIndexOptions options) { MutableRoaringBitmap docIDs = new MutableRoaringBitmap(); RealtimeLuceneDocIdCollector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); - // A thread interrupt during indexSearcher.search() can break the underlying FSDirectory used by the IndexWriter - // which the SearcherManager is created with. To ensure the index is never corrupted the search is executed - // in a child thread and the interrupt is handled in the current thread by canceling the search gracefully. + // Search is executed in SEARCHER_POOL which is wrapped with contextAwareExecutorService(executor, false). + // This propagates QueryThreadContext for CPU/memory tracking without registering the task for cancellation, + // preventing Thread.interrupt() during Lucene search which could corrupt FSDirectory. // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 - Callable<MutableRoaringBitmap> searchCallable = () -> { - IndexSearcher indexSearcher = null; - try { - Query query = LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, _column, _analyzer); - indexSearcher = _searcherManager.acquire(); - indexSearcher.search(query, docIDCollector); - return getPinotDocIds(indexSearcher, docIDs); - } finally { - try { - if (indexSearcher != null) { - _searcherManager.release(indexSearcher); - } - } catch (Exception e) { - LOGGER.error( - "Failed while releasing the searcher manager for realtime text index for column {}, exception {}", - _column, e.getMessage()); - } - } - }; - Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit(searchCallable); + Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit( + () -> executeSearchWithOptions(actualQuery, options, docIDCollector)); try { return searchFuture.get(); } catch (InterruptedException e) { @@ -193,51 +174,38 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { } } - private MutableRoaringBitmap getDocIdsWithoutOptions(String searchQuery) { - MutableRoaringBitmap docIDs = new MutableRoaringBitmap(); - RealtimeLuceneDocIdCollector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); - // A thread interrupt during indexSearcher.search() can break the underlying FSDirectory used by the IndexWriter - // which the SearcherManager is created with. To ensure the index is never corrupted the search is executed - // in a child thread and the interrupt is handled in the current thread by canceling the search gracefully. - // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 - Callable<MutableRoaringBitmap> searchCallable = () -> { - IndexSearcher indexSearcher = null; - try { - // Lucene query parsers are generally stateful and a new instance must be created per query. - QueryParserBase parser = _queryParserClassConstructor.newInstance(_column, _analyzer); - if (_enablePrefixSuffixMatchingInPhraseQueries) { - // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use - // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. - // Additionally, it does not support wildcards that span across terms. - // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 - // Workaround: Use a custom query parser that correctly implements wildcard searches. - parser.setAllowLeadingWildcard(true); - } - Query query = parser.parse(searchQuery); - if (_enablePrefixSuffixMatchingInPhraseQueries) { - // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use - // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. - // Additionally, it does not support wildcards that span across terms. - // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 - // Workaround: Use a custom query parser that correctly implements wildcard searches. - query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query); - } - indexSearcher = _searcherManager.acquire(); - indexSearcher.search(query, docIDCollector); - return getPinotDocIds(indexSearcher, docIDs); - } finally { + /** + * Executes Lucene search with options. Extracted to a separate method for clarity. + */ + private MutableRoaringBitmap executeSearchWithOptions(String actualQuery, + LuceneTextIndexUtils.LuceneTextIndexOptions options, RealtimeLuceneDocIdCollector docIDCollector) + throws Exception { + IndexSearcher indexSearcher = null; + try { + Query query = LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, _column, _analyzer); + indexSearcher = _searcherManager.acquire(); + indexSearcher.search(query, docIDCollector); + return getPinotDocIds(indexSearcher, docIDCollector.getDocIds()); + } finally { + if (indexSearcher != null) { try { - if (indexSearcher != null) { - _searcherManager.release(indexSearcher); - } + _searcherManager.release(indexSearcher); } catch (Exception e) { - LOGGER.error( - "Failed while releasing the searcher manager for realtime text index for column {}, exception {}", - _column, e.getMessage()); + LOGGER.error("Failed while releasing searcher manager for column {}", _column, e); } } - }; - Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit(searchCallable); + } + } + + private MutableRoaringBitmap getDocIdsWithoutOptions(String searchQuery) { + MutableRoaringBitmap docIDs = new MutableRoaringBitmap(); + RealtimeLuceneDocIdCollector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs); + // Search is executed in SEARCHER_POOL which is wrapped with contextAwareExecutorService(executor, false). + // This propagates QueryThreadContext for CPU/memory tracking without registering the task for cancellation, + // preventing Thread.interrupt() during Lucene search which could corrupt FSDirectory. + // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 + Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit( + () -> executeSearchWithoutOptions(searchQuery, docIDCollector)); try { return searchFuture.get(); } catch (InterruptedException e) { @@ -250,6 +218,48 @@ public class RealtimeLuceneTextIndex implements MutableTextIndex { } } + /** + * Executes Lucene search without options. + */ + private MutableRoaringBitmap executeSearchWithoutOptions(String searchQuery, + RealtimeLuceneDocIdCollector docIDCollector) throws Exception { + IndexSearcher indexSearcher = null; + try { + // Lucene query parsers are generally stateful and a new instance must be created per query. + QueryParserBase parser = _queryParserClassConstructor.newInstance(_column, _analyzer); + if (_enablePrefixSuffixMatchingInPhraseQueries) { + // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use + // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. + // Additionally, it does not support wildcards that span across terms. + // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 + // Workaround: Use a custom query parser that correctly implements wildcard searches. + parser.setAllowLeadingWildcard(true); + } + Query query = parser.parse(searchQuery); + if (_enablePrefixSuffixMatchingInPhraseQueries) { + // Note: Lucene's built-in QueryParser has limited wildcard functionality in phrase queries. It does not use + // the provided analyzer when wildcards are present, defaulting to the default analyzer for tokenization. + // Additionally, it does not support wildcards that span across terms. + // For more details, see: https://github.com/elastic/elasticsearch/issues/22540 + // Workaround: Use a custom query parser that correctly implements wildcard searches. + query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query); + } + indexSearcher = _searcherManager.acquire(); + indexSearcher.search(query, docIDCollector); + return getPinotDocIds(indexSearcher, docIDCollector.getDocIds()); + } finally { + if (indexSearcher != null) { + try { + _searcherManager.release(indexSearcher); + } catch (Exception e) { + LOGGER.error( + "Failed while releasing the searcher manager for realtime text index for column {}, exception {}", + _column, e.getMessage()); + } + } + } + } + // TODO: Optimize this similar to how we have done for offline/completed segments. // Pre-built mapping will not work for realtime. We need to build an on-the-fly cache // as queries are coming in. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java index 9eb1126f4a0..edb8da4e2bd 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.realtime.impl.invertedindex; import java.util.concurrent.ExecutorService; import org.apache.pinot.common.utils.ScalingThreadPoolExecutor; +import org.apache.pinot.spi.query.QueryThreadContext; /** @@ -27,13 +28,19 @@ import org.apache.pinot.common.utils.ScalingThreadPoolExecutor; * The pool max size is equivalent to pinot.query.scheduler.query_worker_threads to ensure each worker thread can have * an accompanying Lucene searcher thread if needed. init() is called in BaseServerStarter to avoid creating a * dependency on pinot-core. + * + * The executor is wrapped with QueryThreadContext.contextAwareExecutorService(executor, false) to propagate + * QueryThreadContext for CPU/memory tracking, but WITHOUT registering tasks for cancellation. This prevents + * Thread.interrupt() during Lucene search which could corrupt FSDirectory used by IndexWriter. + * See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 */ public class RealtimeLuceneTextIndexSearcherPool { private static RealtimeLuceneTextIndexSearcherPool _singletonInstance; private static ExecutorService _executorService; private RealtimeLuceneTextIndexSearcherPool(int size) { - _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500); + ExecutorService baseExecutor = ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500); + _executorService = QueryThreadContext.contextAwareExecutorService(baseExecutor, false); } public static RealtimeLuceneTextIndexSearcherPool getInstance() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java index 53fa628a5a0..7987f266e42 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java @@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.realtime.impl.vector; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.Future; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.lucene.document.Document; @@ -33,6 +34,7 @@ import org.apache.lucene.search.KnnFloatVectorQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.FSDirectory; +import org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool; import org.apache.pinot.segment.local.segment.creator.impl.vector.XKnnFloatVectorField; import org.apache.pinot.segment.local.segment.store.VectorIndexUtils; import org.apache.pinot.segment.spi.V1Constants; @@ -51,6 +53,8 @@ import org.slf4j.LoggerFactory; */ public class MutableVectorIndex implements VectorIndexReader, MutableIndex { private static final Logger LOGGER = LoggerFactory.getLogger(MutableVectorIndex.class); + private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL = + RealtimeLuceneTextIndexSearcherPool.getInstance(); public static final String VECTOR_INDEX_DOC_ID_COLUMN_NAME = "DocID"; public static final long DEFAULT_COMMIT_INTERVAL_MS = 10_000L; public static final long DEFAULT_COMMIT_DOCS = 1000L; @@ -125,16 +129,30 @@ public class MutableVectorIndex implements VectorIndexReader, MutableIndex { @Override public MutableRoaringBitmap getDocIds(float[] vector, int topK) { - MutableRoaringBitmap docIds; + // Search is executed in SEARCHER_POOL which is wrapped with contextAwareExecutorService(executor, false). + // This propagates QueryThreadContext for CPU/memory tracking without registering the task for cancellation, + // preventing Thread.interrupt() during Lucene search which could corrupt FSDirectory. + // See https://github.com/apache/lucene/issues/3315 and https://github.com/apache/lucene/issues/9309 + Future<MutableRoaringBitmap> searchFuture = SEARCHER_POOL.getExecutorService().submit( + () -> executeVectorSearch(vector, topK)); try { - IndexSearcher indexSearcher = new IndexSearcher(DirectoryReader.open(_indexDirectory)); - Query query = new KnnFloatVectorQuery(_vectorColumn, vector, topK); - docIds = new MutableRoaringBitmap(); - TopDocs search = indexSearcher.search(query, topK); - Arrays.stream(search.scoreDocs).map(scoreDoc -> scoreDoc.doc).forEach(docIds::add); - } catch (IOException e) { - throw new RuntimeException(e); + return searchFuture.get(); + } catch (InterruptedException e) { + searchFuture.cancel(false); + throw new RuntimeException("VECTOR_SIMILARITY query interrupted for segment " + _segmentName + + " column " + _vectorColumn, e); + } catch (Exception e) { + throw new RuntimeException("Failed while searching vector index for segment " + _segmentName + + " column " + _vectorColumn, e); } + } + + private MutableRoaringBitmap executeVectorSearch(float[] vector, int topK) throws IOException { + IndexSearcher indexSearcher = new IndexSearcher(DirectoryReader.open(_indexDirectory)); + Query query = new KnnFloatVectorQuery(_vectorColumn, vector, topK); + MutableRoaringBitmap docIds = new MutableRoaringBitmap(); + TopDocs search = indexSearcher.search(query, topK); + Arrays.stream(search.scoreDocs).map(scoreDoc -> scoreDoc.doc).forEach(docIds::add); return docIds; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java index e5d487dd602..e27914d6da7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java @@ -20,10 +20,12 @@ package org.apache.pinot.segment.local.segment.index.readers.text; import java.io.IOException; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.Collector; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; +import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -60,6 +62,8 @@ public class LuceneDocIdCollector implements Collector { public LeafCollector getLeafCollector(LeafReaderContext context) { return new LeafCollector() { + private int _numDocsCollected = 0; + @Override public void setScorer(Scorable scorer) throws IOException { @@ -69,6 +73,21 @@ public class LuceneDocIdCollector implements Collector { @Override public void collect(int doc) throws IOException { + try { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically( + _numDocsCollected++, "LuceneDocIdCollector"); + } catch (RuntimeException e) { + // Why CollectionTerminatedException: Lucene's IndexSearcher.search() specially handles this exception + // to gracefully stop document collection without treating it as an error. If we let the original + // TerminationException propagate, Lucene would wrap it and log errors unnecessarily. + // + // How termination info is preserved: When checkTerminationAndSampleUsagePeriodically() throws + // TerminationException (for OOM/timeout), it's already stored in QueryExecutionContext._terminateException + // before being thrown. After search completes, higher-level code retrieves the actual error via + // QueryThreadContext.getTerminateException() to include proper error details in query response. + throw new CollectionTerminatedException(); + } + // Compute the absolute lucene docID across // sub-indexes because that's how the lookup table in docIdTranslator is built _docIds.add(_docIdTranslator.getPinotDocId(context.docBase + doc)); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java index 788e6f6dc8d..358f95af636 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java @@ -20,10 +20,12 @@ package org.apache.pinot.segment.local.segment.index.readers.vector; import java.io.IOException; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.Collector; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; +import org.apache.pinot.spi.query.QueryThreadContext; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -59,6 +61,7 @@ public class HnswDocIdCollector implements Collector { @Override public LeafCollector getLeafCollector(LeafReaderContext context) { return new LeafCollector() { + private int _numDocsCollected = 0; @Override public void setScorer(Scorable scorer) @@ -69,6 +72,13 @@ public class HnswDocIdCollector implements Collector { @Override public void collect(int doc) throws IOException { + try { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically( + _numDocsCollected++, "HnswDocIdCollector"); + } catch (RuntimeException e) { + throw new CollectionTerminatedException(); + } + // Compute the absolute lucene docID across // sub-indexes because that's how the lookup table in docIdTranslator is built _docIds.add(_docIdTranslator.getPinotDocId(context.docBase + doc)); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java index 4dd3de5e53c..4bc21d1506a 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java @@ -32,11 +32,14 @@ import org.apache.lucene.queryparser.classic.QueryParser; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.util.TestUtils; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.mockito.Mockito.mock; @@ -52,12 +55,26 @@ public class LuceneMutableTextIndexTest { private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL = RealtimeLuceneTextIndexSearcherPool.init(1); private RealtimeLuceneTextIndex _realtimeLuceneTextIndex; + private QueryThreadContext _queryThreadContext; public LuceneMutableTextIndexTest() { RealtimeLuceneIndexRefreshManager.init(1, 10); ServerMetrics.register(mock(ServerMetrics.class)); } + @BeforeMethod + public void setUpMethod() { + _queryThreadContext = QueryThreadContext.openForSseTest(); + } + + @AfterMethod + public void tearDownMethod() { + if (_queryThreadContext != null) { + _queryThreadContext.close(); + _queryThreadContext = null; + } + } + @Test public void testDefaultAnalyzerAndDefaultQueryParser() { // Test queries with standard analyzer with default configurations used by Pinot @@ -259,9 +276,12 @@ public class LuceneMutableTextIndexTest { public void testQueryCancellationIsSuccessful() throws InterruptedException, ExecutionException { // Avoid early finalization by not using Executors.newSingleThreadExecutor (java <= 20, JDK-8145304) - ExecutorService executor = Executors.newFixedThreadPool(1); + ExecutorService baseExecutor = Executors.newFixedThreadPool(1); + // Wrap with contextAwareExecutorService to propagate QueryThreadContext to child threads + ExecutorService executor = QueryThreadContext.contextAwareExecutorService(baseExecutor); Future<MutableRoaringBitmap> res = executor.submit(() -> _realtimeLuceneTextIndex.getDocIds("/.*read.*/")); - executor.shutdownNow(); + // Shutdown the base executor to trigger interrupt on the worker thread + baseExecutor.shutdownNow(); res.get(); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexResourceTrackingTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexResourceTrackingTest.java new file mode 100644 index 00000000000..145da5ea94a --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexResourceTrackingTest.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.realtime.impl.invertedindex; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder; +import org.apache.pinot.segment.spi.index.TextIndexConfig; +import org.apache.pinot.spi.accounting.QueryResourceTracker; +import org.apache.pinot.spi.accounting.ThreadAccountant; +import org.apache.pinot.spi.accounting.ThreadResourceTracker; +import org.apache.pinot.spi.exception.QueryErrorCode; +import org.apache.pinot.spi.query.QueryExecutionContext; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.util.TestUtils; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +public class RealtimeLuceneTextIndexResourceTrackingTest { + private static final File INDEX_DIR = + new File(FileUtils.getTempDirectory(), "RealtimeLuceneTextIndexResourceTrackingTest"); + private static final String TEXT_COLUMN_NAME = "textColumn"; + private static final AtomicInteger SEGMENT_COUNTER = new AtomicInteger(0); + + private RealtimeLuceneTextIndex _textIndex; + + static { + try { + RealtimeLuceneTextIndexSearcherPool.init(2); + } catch (AssertionError e) { + // Already initialized + } + RealtimeLuceneIndexRefreshManager.init(1, 10); + ServerMetrics.register(mock(ServerMetrics.class)); + } + + @BeforeClass + public void setUpClass() throws Exception { + FileUtils.deleteDirectory(INDEX_DIR); + FileUtils.forceMkdir(INDEX_DIR); + RealtimeLuceneIndexRefreshManager.getInstance().reset(); + } + + @AfterClass + public void tearDownClass() throws Exception { + FileUtils.deleteDirectory(INDEX_DIR); + } + + @BeforeMethod + public void setUp() { + TextIndexConfig config = new TextIndexConfigBuilder().build(); + String segmentName = "segment_" + SEGMENT_COUNTER.getAndIncrement() + "__0__1__20240101T0000Z"; + _textIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, segmentName, config); + + for (int i = 0; i < 100; i++) { + _textIndex.add("document number " + i + " with searchable content"); + } + _textIndex.commit(); + + // Wait for index refresh + TestUtils.waitForCondition(aVoid -> { + try { + return _textIndex.getSearcherManager().isSearcherCurrent(); + } catch (IOException e) { + return false; + } + }, 5000, "Index refresh timeout"); + } + + @AfterMethod + public void tearDown() { + if (_textIndex != null) { + _textIndex.close(); + _textIndex = null; + } + } + + @Test + public void testSearcherThreadRegisteredWithAccountant() throws Exception { + TrackingAccountant accountant = new TrackingAccountant(); + QueryExecutionContext executionContext = QueryExecutionContext.forSseTest(); + + try (QueryThreadContext ignored = QueryThreadContext.open(executionContext, accountant)) { + MutableRoaringBitmap result = _textIndex.getDocIds("searchable"); + + assertTrue(result.getCardinality() > 0, "Search should return results"); + + assertTrue(accountant.getSetupTaskCount() >= 2, + "setupTask should be called for both parent and searcher threads, got: " + accountant.getSetupTaskCount()); + } + } + + @Test + public void testResourceUsageSampledDuringSearch() throws Exception { + TrackingAccountant accountant = new TrackingAccountant(); + QueryExecutionContext executionContext = QueryExecutionContext.forSseTest(); + + try (QueryThreadContext ignored = QueryThreadContext.open(executionContext, accountant)) { + // Perform search with many matching docs to trigger periodic sampling + MutableRoaringBitmap result = _textIndex.getDocIds("document"); + + assertTrue(result.getCardinality() > 0, "Search should return results"); + } + + // After context close, clear should have been called + assertTrue(accountant.getClearCount() >= 1, "clear() should be called on context close"); + } + + @Test + public void testQueryTerminationDetectedByCollector() throws Exception { + TrackingAccountant accountant = new TrackingAccountant(); + QueryExecutionContext executionContext = QueryExecutionContext.forSseTest(); + + // Add many more documents for longer search + TextIndexConfig config = new TextIndexConfigBuilder().build(); + String segmentName = "segment_termination_" + SEGMENT_COUNTER.getAndIncrement() + "__0__1__20240101T0000Z"; + RealtimeLuceneTextIndex largeIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, segmentName, config); + + try { + for (int i = 0; i < 50000; i++) { + largeIndex.add("document number " + i + " with lots of searchable content for termination test"); + } + largeIndex.commit(); + + TestUtils.waitForCondition(aVoid -> { + try { + return largeIndex.getSearcherManager().isSearcherCurrent(); + } catch (IOException e) { + return false; + } + }, 10000, "Index refresh timeout"); + + ExecutorService baseExecutor = Executors.newSingleThreadExecutor(); + try (QueryThreadContext ignored = QueryThreadContext.open(executionContext, accountant)) { + // Wrap with contextAwareExecutorService but don't register for cancellation (false). + ExecutorService executor = QueryThreadContext.contextAwareExecutorService(baseExecutor, false); + + // Start a search that will take some time + Future<MutableRoaringBitmap> searchFuture = executor.submit( + () -> largeIndex.getDocIds("/.*searchable.*/")); + + // Give the search a moment to start + Thread.sleep(50); + + // Terminate the query - this sets the termination flag that the collector will detect + executionContext.terminate(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED, "OOM test"); + + try { + searchFuture.get(); + // Search might complete before termination is detected - that's OK for this test + } catch (ExecutionException e) { + // Expected - search was terminated via collector detecting the termination flag + assertTrue(e.getCause() instanceof RuntimeException || e.getCause() instanceof Error, + "Should throw RuntimeException or Error on termination, got: " + e.getCause().getClass().getName()); + } catch (CancellationException e) { + // Also acceptable if future was cancelled + } + } finally { + // Ensure executor is fully shut down before closing the index to prevent + baseExecutor.shutdown(); + baseExecutor.awaitTermination(5, TimeUnit.SECONDS); + } + } finally { + largeIndex.close(); + } + } + + @Test(expectedExceptions = ExecutionException.class, + expectedExceptionsMessageRegExp = ".*TEXT_MATCH query interrupted.*") + public void testInterruptTriggersShouldCancel() throws Exception { + TrackingAccountant accountant = new TrackingAccountant(); + QueryExecutionContext executionContext = QueryExecutionContext.forSseTest(); + + try (QueryThreadContext ignored = QueryThreadContext.open(executionContext, accountant)) { + ExecutorService baseExecutor = Executors.newFixedThreadPool(1); + ExecutorService executor = QueryThreadContext.contextAwareExecutorService(baseExecutor); + Future<MutableRoaringBitmap> future = executor.submit(() -> _textIndex.getDocIds("/.*content.*/")); + baseExecutor.shutdownNow(); + future.get(); + } + } + + @Test + public void testConcurrentSearchesTrackedIndependently() throws Exception { + TrackingAccountant accountant = new TrackingAccountant(); + QueryExecutionContext executionContext = QueryExecutionContext.forSseTest(); + // Wrap with contextAwareExecutorService to propagate QueryThreadContext to child threads + ExecutorService baseExecutor = Executors.newFixedThreadPool(3); + ExecutorService executor = QueryThreadContext.contextAwareExecutorService(baseExecutor); + + try (QueryThreadContext ignored = QueryThreadContext.open(executionContext, accountant)) { + Future<MutableRoaringBitmap> future1 = executor.submit(() -> _textIndex.getDocIds("document")); + Future<MutableRoaringBitmap> future2 = executor.submit(() -> _textIndex.getDocIds("number")); + Future<MutableRoaringBitmap> future3 = executor.submit(() -> _textIndex.getDocIds("searchable")); + + MutableRoaringBitmap result1 = future1.get(); + MutableRoaringBitmap result2 = future2.get(); + MutableRoaringBitmap result3 = future3.get(); + + assertTrue(result1.getCardinality() > 0); + assertTrue(result2.getCardinality() > 0); + assertTrue(result3.getCardinality() > 0); + + // Parent thread + 3 executor threads + their searcher threads = multiple setupTask calls + // Due to thread pool reuse and timing, we expect at least 4 (1 parent + 3 searches) + assertTrue(accountant.getSetupTaskCount() >= 4, + "Multiple searches should register threads, got: " + accountant.getSetupTaskCount()); + } + + baseExecutor.shutdown(); + } + + @Test + public void testExecutionContextSharedWithSearcherThread() throws Exception { + TrackingAccountant accountant = new TrackingAccountant(); + QueryExecutionContext executionContext = QueryExecutionContext.forSseTest(); + + try (QueryThreadContext ignored = QueryThreadContext.open(executionContext, accountant)) { + _textIndex.getDocIds("document"); + + // Verify all registered contexts share the same executionContext + for (QueryThreadContext ctx : accountant.getRegisteredContexts()) { + assertEquals(ctx.getExecutionContext(), executionContext, + "All thread contexts should share the same execution context"); + } + } + } + + /** + * A ThreadAccountant implementation that tracks calls for testing purposes. + */ + private static class TrackingAccountant implements ThreadAccountant { + private final AtomicInteger _setupTaskCount = new AtomicInteger(0); + private final AtomicInteger _sampleUsageCount = new AtomicInteger(0); + private final AtomicInteger _clearCount = new AtomicInteger(0); + private final ConcurrentHashMap<Thread, QueryThreadContext> _registeredContexts = new ConcurrentHashMap<>(); + + @Override + public void setupTask(QueryThreadContext threadContext) { + _setupTaskCount.incrementAndGet(); + _registeredContexts.put(Thread.currentThread(), threadContext); + } + + @Override + public void sampleUsage() { + _sampleUsageCount.incrementAndGet(); + } + + @Override + public void clear() { + _clearCount.incrementAndGet(); + _registeredContexts.remove(Thread.currentThread()); + } + + @Override + public void updateUntrackedResourceUsage(String identifier, long cpuTimeNs, long allocatedBytes, + org.apache.pinot.spi.accounting.TrackingScope trackingScope) { + } + + @Override + public Collection<? extends ThreadResourceTracker> getThreadResources() { + return Collections.emptyList(); + } + + @Override + public Map<String, ? extends QueryResourceTracker> getQueryResources() { + return Collections.emptyMap(); + } + + public int getSetupTaskCount() { + return _setupTaskCount.get(); + } + + public int getSampleUsageCount() { + return _sampleUsageCount.get(); + } + + public int getClearCount() { + return _clearCount.get(); + } + + public Collection<QueryThreadContext> getRegisteredContexts() { + return _registeredContexts.values(); + } + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java index 2feaa2fbee2..ef3823578fe 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.pinot.spi.accounting.ThreadAccountant; @@ -178,9 +180,22 @@ public class QueryThreadContext implements AutoCloseable { } /// Returns a new [ExecutorService] whose tasks will be executed with the [QueryThreadContext] initialized with the - /// state of the thread submitting the tasks. + /// state of the thread submitting the tasks. Tasks are registered for cancellation when the query is terminated. public static ExecutorService contextAwareExecutorService(ExecutorService executorService) { - return new DecoratorExecutorService(executorService, future -> get().getExecutionContext().addTask(future)) { + return contextAwareExecutorService(executorService, true); + } + + /// Returns a new [ExecutorService] whose tasks will be executed with the [QueryThreadContext] initialized with the + /// state of the thread submitting the tasks. + /// @param registerTaskForCancellation If true, tasks are registered with QueryExecutionContext.addTask() and will + /// be cancelled (via Thread.interrupt()) when the query is terminated. Set to + /// false for executors where interrupt could cause corruption + public static ExecutorService contextAwareExecutorService(ExecutorService executorService, + boolean registerTaskForCancellation) { + Consumer<Future<?>> onSubmit = registerTaskForCancellation + ? future -> get().getExecutionContext().addTask(future) + : null; + return new DecoratorExecutorService(executorService, onSubmit) { @Override protected <T> Callable<T> decorate(Callable<T> task) { QueryThreadContext parentThreadContext = get(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
