This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch release-1.5.0-rc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit fe79b9ad43e27b3744712eea8da96487a022723f
Author: Anurag Rai <[email protected]>
AuthorDate: Thu Apr 2 05:40:30 2026 +0530

    Fix Broken Lucene Query Tracking and Cancellation for OOM Protection 
(#17884)
---
 .../MultiColumnRealtimeLuceneTextIndex.java        | 155 +++++-----
 .../RealtimeLuceneDocIdCollector.java              |  21 ++
 .../invertedindex/RealtimeLuceneTextIndex.java     | 140 ++++-----
 .../RealtimeLuceneTextIndexSearcherPool.java       |   9 +-
 .../realtime/impl/vector/MutableVectorIndex.java   |  34 ++-
 .../index/readers/text/LuceneDocIdCollector.java   |  19 ++
 .../index/readers/vector/HnswDocIdCollector.java   |  10 +
 .../invertedindex/LuceneMutableTextIndexTest.java  |  24 +-
 ...ealtimeLuceneTextIndexResourceTrackingTest.java | 323 +++++++++++++++++++++
 .../apache/pinot/spi/query/QueryThreadContext.java |  19 +-
 10 files changed, 601 insertions(+), 153 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java
index 783d7daa3d5..884227d31ec 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/MultiColumnRealtimeLuceneTextIndex.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import org.apache.lucene.analysis.Analyzer;
@@ -174,30 +173,12 @@ public class MultiColumnRealtimeLuceneTextIndex 
implements MultiColumnTextIndexR
       LuceneTextIndexUtils.LuceneTextIndexOptions options) {
     MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
     RealtimeLuceneDocIdCollector docIDCollector = new 
RealtimeLuceneDocIdCollector(docIDs);
-    // A thread interrupt during indexSearcher.search() can break the 
underlying FSDirectory used by the IndexWriter
-    // which the SearcherManager is created with. To ensure the index is never 
corrupted the search is executed
-    // in a child thread and the interrupt is handled in the current thread by 
canceling the search gracefully.
+    // Search is executed in SEARCHER_POOL which is wrapped with 
contextAwareExecutorService(executor, false).
+    // This propagates QueryThreadContext for CPU/memory tracking without 
registering the task for cancellation,
+    // preventing Thread.interrupt() during Lucene search which could corrupt 
FSDirectory.
     // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
-    Callable<MutableRoaringBitmap> searchCallable = () -> {
-      IndexSearcher indexSearcher = null;
-      try {
-        Query query = 
LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, column, 
_analyzer);
-        indexSearcher = _searcherManager.acquire();
-        indexSearcher.search(query, docIDCollector);
-        return getPinotDocIds(indexSearcher, docIDs);
-      } finally {
-        try {
-          if (indexSearcher != null) {
-            _searcherManager.release(indexSearcher);
-          }
-        } catch (Exception e) {
-          LOGGER.error(
-              "Failed while releasing the searcher manager for realtime text 
index for columns {}, exception {}",
-              _columns, e.getMessage());
-        }
-      }
-    };
-    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(searchCallable);
+    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(
+        () -> executeSearchWithOptions(column, actualQuery, options, 
docIDCollector));
     try {
       return searchFuture.get();
     } catch (InterruptedException e) {
@@ -210,64 +191,37 @@ public class MultiColumnRealtimeLuceneTextIndex 
implements MultiColumnTextIndexR
     }
   }
 
-  private MutableRoaringBitmap getDocIdsWithoutOptions(String column, String 
searchQuery) {
-    MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
-    RealtimeLuceneDocIdCollector docIDCollector = new 
RealtimeLuceneDocIdCollector(docIDs);
-    // A thread interrupt during indexSearcher.search() can break the 
underlying FSDirectory used by the IndexWriter
-    // which the SearcherManager is created with. To ensure the index is never 
corrupted the search is executed
-    // in a child thread and the interrupt is handled in the current thread by 
canceling the search gracefully.
-    // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
-    Callable<MutableRoaringBitmap> searchCallable = () -> {
-      IndexSearcher indexSearcher = null;
-      try {
-
-        // Lucene query parsers are generally stateful and a new instance must 
be created per query.
-        Constructor<QueryParserBase> queryParserClassConstructor = 
_queryParserClassConstructor;
-        boolean enablePrefixSuffixMatchingInPhraseQueries = 
_enablePrefixSuffixMatchingInPhraseQueries;
-        MultiColumnLuceneTextIndexReader.ColumnConfig columnConfig = 
_perColumnConfigs.get(column);
-        if (columnConfig != null) {
-          if (columnConfig.getQueryParserClassConstructor() != null) {
-            queryParserClassConstructor = 
columnConfig.getQueryParserClassConstructor();
-          }
-          if (columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries() != 
null) {
-            enablePrefixSuffixMatchingInPhraseQueries = 
columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries();
-          }
-        }
-
-        QueryParserBase parser = 
queryParserClassConstructor.newInstance(column, _analyzer);
-        if (enablePrefixSuffixMatchingInPhraseQueries) {
-          // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
-          // the provided analyzer when wildcards are present, defaulting to 
the default analyzer for tokenization.
-          // Additionally, it does not support wildcards that span across 
terms.
-          // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
-          // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
-          parser.setAllowLeadingWildcard(true);
-        }
-        Query query = parser.parse(searchQuery);
-        if (enablePrefixSuffixMatchingInPhraseQueries) {
-          // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
-          // the provided analyzer when wildcards are present, defaulting to 
the default analyzer for tokenization.
-          // Additionally, it does not support wildcards that span across 
terms.
-          // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
-          // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
-          query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query);
-        }
-        indexSearcher = _searcherManager.acquire();
-        indexSearcher.search(query, docIDCollector);
-        return getPinotDocIds(indexSearcher, docIDs);
-      } finally {
+  private MutableRoaringBitmap executeSearchWithOptions(String column, String 
actualQuery,
+      LuceneTextIndexUtils.LuceneTextIndexOptions options, 
RealtimeLuceneDocIdCollector docIDCollector)
+      throws Exception {
+    IndexSearcher indexSearcher = null;
+    try {
+      Query query = 
LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, column, 
_analyzer);
+      indexSearcher = _searcherManager.acquire();
+      indexSearcher.search(query, docIDCollector);
+      return getPinotDocIds(indexSearcher, docIDCollector.getDocIds());
+    } finally {
+      if (indexSearcher != null) {
         try {
-          if (indexSearcher != null) {
-            _searcherManager.release(indexSearcher);
-          }
+          _searcherManager.release(indexSearcher);
         } catch (Exception e) {
           LOGGER.error(
               "Failed while releasing the searcher manager for realtime text 
index for columns {}, exception {}",
               _columns, e.getMessage());
         }
       }
-    };
-    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(searchCallable);
+    }
+  }
+
+  private MutableRoaringBitmap getDocIdsWithoutOptions(String column, String 
searchQuery) {
+    MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
+    RealtimeLuceneDocIdCollector docIDCollector = new 
RealtimeLuceneDocIdCollector(docIDs);
+    // Search is executed in SEARCHER_POOL which is wrapped with 
contextAwareExecutorService(executor, false).
+    // This propagates QueryThreadContext for CPU/memory tracking without 
registering the task for cancellation,
+    // preventing Thread.interrupt() during Lucene search which could corrupt 
FSDirectory.
+    // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
+    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(
+        () -> executeSearchWithoutOptions(column, searchQuery, 
docIDCollector));
     try {
       return searchFuture.get();
     } catch (InterruptedException e) {
@@ -280,6 +234,57 @@ public class MultiColumnRealtimeLuceneTextIndex implements 
MultiColumnTextIndexR
     }
   }
 
+  private MutableRoaringBitmap executeSearchWithoutOptions(String column, 
String searchQuery,
+      RealtimeLuceneDocIdCollector docIDCollector) throws Exception {
+    IndexSearcher indexSearcher = null;
+    try {
+      // Lucene query parsers are generally stateful and a new instance must 
be created per query.
+      Constructor<QueryParserBase> queryParserClassConstructor = 
_queryParserClassConstructor;
+      boolean enablePrefixSuffixMatchingInPhraseQueries = 
_enablePrefixSuffixMatchingInPhraseQueries;
+      MultiColumnLuceneTextIndexReader.ColumnConfig columnConfig = 
_perColumnConfigs.get(column);
+      if (columnConfig != null) {
+        if (columnConfig.getQueryParserClassConstructor() != null) {
+          queryParserClassConstructor = 
columnConfig.getQueryParserClassConstructor();
+        }
+        if (columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries() != 
null) {
+          enablePrefixSuffixMatchingInPhraseQueries = 
columnConfig.getEnablePrefixSuffixMatchingInPhraseQueries();
+        }
+      }
+
+      QueryParserBase parser = queryParserClassConstructor.newInstance(column, 
_analyzer);
+      if (enablePrefixSuffixMatchingInPhraseQueries) {
+        // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
+        // the provided analyzer when wildcards are present, defaulting to the 
default analyzer for tokenization.
+        // Additionally, it does not support wildcards that span across terms.
+        // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
+        // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
+        parser.setAllowLeadingWildcard(true);
+      }
+      Query query = parser.parse(searchQuery);
+      if (enablePrefixSuffixMatchingInPhraseQueries) {
+        // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
+        // the provided analyzer when wildcards are present, defaulting to the 
default analyzer for tokenization.
+        // Additionally, it does not support wildcards that span across terms.
+        // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
+        // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
+        query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query);
+      }
+      indexSearcher = _searcherManager.acquire();
+      indexSearcher.search(query, docIDCollector);
+      return getPinotDocIds(indexSearcher, docIDCollector.getDocIds());
+    } finally {
+      if (indexSearcher != null) {
+        try {
+          _searcherManager.release(indexSearcher);
+        } catch (Exception e) {
+          LOGGER.error(
+              "Failed while releasing the searcher manager for realtime text 
index for columns {}, exception {}",
+              _columns, e.getMessage());
+        }
+      }
+    }
+  }
+
   // TODO: Optimize this similar to how we have done for offline/completed 
segments.
   // Pre-built mapping will not work for realtime. We need to build an 
on-the-fly cache
   // as queries are coming in.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
index 2b46061e9eb..4c64c703ee8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneDocIdCollector.java
@@ -20,10 +20,12 @@ package 
org.apache.pinot.segment.local.realtime.impl.invertedindex;
 
 import java.io.IOException;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.CollectionTerminatedException;
 import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.Scorable;
 import org.apache.lucene.search.ScoreMode;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
@@ -51,6 +53,8 @@ public class RealtimeLuceneDocIdCollector implements 
Collector {
   @Override
   public LeafCollector getLeafCollector(LeafReaderContext context) {
     return new LeafCollector() {
+      // Counter for periodic termination check
+      private int _numDocsCollected = 0;
 
       @Override
       public void setScorer(Scorable scorer)
@@ -64,6 +68,19 @@ public class RealtimeLuceneDocIdCollector implements 
Collector {
         if (_shouldCancel) {
           throw new RuntimeException("TEXT_MATCH query was cancelled");
         }
+        try {
+          QueryThreadContext.checkTerminationAndSampleUsagePeriodically(
+              _numDocsCollected++, "RealtimeLuceneDocIdCollector");
+        } catch (RuntimeException e) {
+          // CollectionTerminatedException - Lucene's IndexSearcher.search() 
specially handles this exception
+          // to gracefully stop document collection without treating it as an 
error.
+          // When checkTerminationAndSampleUsagePeriodically() throws
+          // TerminationException (for OOM/timeout), it's already stored in 
QueryExecutionContext._terminateException
+          // before being thrown. After search completes, higher-level code 
retrieves the actual error via
+          // QueryThreadContext.getTerminateException() to include proper 
error details in query response.
+          throw new CollectionTerminatedException();
+        }
+
         // Compute the absolute lucene docID across sub-indexes as doc that is 
passed is relative to the current reader
         _docIds.add(context.docBase + doc);
       }
@@ -73,4 +90,8 @@ public class RealtimeLuceneDocIdCollector implements 
Collector {
   public void markShouldCancel() {
     _shouldCancel = true;
   }
+
+  public MutableRoaringBitmap getDocIds() {
+    return _docIds;
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
index aa09fdd2d50..47b7ef75f18 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndex.java
@@ -21,7 +21,6 @@ package 
org.apache.pinot.segment.local.realtime.impl.invertedindex;
 import java.io.File;
 import java.lang.reflect.Constructor;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import org.apache.lucene.analysis.Analyzer;
@@ -157,30 +156,12 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
       LuceneTextIndexUtils.LuceneTextIndexOptions options) {
     MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
     RealtimeLuceneDocIdCollector docIDCollector = new 
RealtimeLuceneDocIdCollector(docIDs);
-    // A thread interrupt during indexSearcher.search() can break the 
underlying FSDirectory used by the IndexWriter
-    // which the SearcherManager is created with. To ensure the index is never 
corrupted the search is executed
-    // in a child thread and the interrupt is handled in the current thread by 
canceling the search gracefully.
+    // Search is executed in SEARCHER_POOL which is wrapped with 
contextAwareExecutorService(executor, false).
+    // This propagates QueryThreadContext for CPU/memory tracking without 
registering the task for cancellation,
+    // preventing Thread.interrupt() during Lucene search which could corrupt 
FSDirectory.
     // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
-    Callable<MutableRoaringBitmap> searchCallable = () -> {
-      IndexSearcher indexSearcher = null;
-      try {
-        Query query = 
LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, 
_column, _analyzer);
-        indexSearcher = _searcherManager.acquire();
-        indexSearcher.search(query, docIDCollector);
-        return getPinotDocIds(indexSearcher, docIDs);
-      } finally {
-        try {
-          if (indexSearcher != null) {
-            _searcherManager.release(indexSearcher);
-          }
-        } catch (Exception e) {
-          LOGGER.error(
-              "Failed while releasing the searcher manager for realtime text 
index for column {}, exception {}",
-              _column, e.getMessage());
-        }
-      }
-    };
-    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(searchCallable);
+    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(
+        () -> executeSearchWithOptions(actualQuery, options, docIDCollector));
     try {
       return searchFuture.get();
     } catch (InterruptedException e) {
@@ -193,51 +174,38 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
     }
   }
 
-  private MutableRoaringBitmap getDocIdsWithoutOptions(String searchQuery) {
-    MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
-    RealtimeLuceneDocIdCollector docIDCollector = new 
RealtimeLuceneDocIdCollector(docIDs);
-    // A thread interrupt during indexSearcher.search() can break the 
underlying FSDirectory used by the IndexWriter
-    // which the SearcherManager is created with. To ensure the index is never 
corrupted the search is executed
-    // in a child thread and the interrupt is handled in the current thread by 
canceling the search gracefully.
-    // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
-    Callable<MutableRoaringBitmap> searchCallable = () -> {
-      IndexSearcher indexSearcher = null;
-      try {
-        // Lucene query parsers are generally stateful and a new instance must 
be created per query.
-        QueryParserBase parser = 
_queryParserClassConstructor.newInstance(_column, _analyzer);
-        if (_enablePrefixSuffixMatchingInPhraseQueries) {
-          // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
-          // the provided analyzer when wildcards are present, defaulting to 
the default analyzer for tokenization.
-          // Additionally, it does not support wildcards that span across 
terms.
-          // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
-          // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
-          parser.setAllowLeadingWildcard(true);
-        }
-        Query query = parser.parse(searchQuery);
-        if (_enablePrefixSuffixMatchingInPhraseQueries) {
-          // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
-          // the provided analyzer when wildcards are present, defaulting to 
the default analyzer for tokenization.
-          // Additionally, it does not support wildcards that span across 
terms.
-          // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
-          // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
-          query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query);
-        }
-        indexSearcher = _searcherManager.acquire();
-        indexSearcher.search(query, docIDCollector);
-        return getPinotDocIds(indexSearcher, docIDs);
-      } finally {
+  /**
+   * Executes Lucene search with options. Extracted to a separate method for 
clarity.
+   */
+  private MutableRoaringBitmap executeSearchWithOptions(String actualQuery,
+      LuceneTextIndexUtils.LuceneTextIndexOptions options, 
RealtimeLuceneDocIdCollector docIDCollector)
+      throws Exception {
+    IndexSearcher indexSearcher = null;
+    try {
+      Query query = 
LuceneTextIndexUtils.createQueryParserWithOptions(actualQuery, options, 
_column, _analyzer);
+      indexSearcher = _searcherManager.acquire();
+      indexSearcher.search(query, docIDCollector);
+      return getPinotDocIds(indexSearcher, docIDCollector.getDocIds());
+    } finally {
+      if (indexSearcher != null) {
         try {
-          if (indexSearcher != null) {
-            _searcherManager.release(indexSearcher);
-          }
+          _searcherManager.release(indexSearcher);
         } catch (Exception e) {
-          LOGGER.error(
-              "Failed while releasing the searcher manager for realtime text 
index for column {}, exception {}",
-              _column, e.getMessage());
+          LOGGER.error("Failed while releasing searcher manager for column 
{}", _column, e);
         }
       }
-    };
-    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(searchCallable);
+    }
+  }
+
+  private MutableRoaringBitmap getDocIdsWithoutOptions(String searchQuery) {
+    MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
+    RealtimeLuceneDocIdCollector docIDCollector = new 
RealtimeLuceneDocIdCollector(docIDs);
+    // Search is executed in SEARCHER_POOL which is wrapped with 
contextAwareExecutorService(executor, false).
+    // This propagates QueryThreadContext for CPU/memory tracking without 
registering the task for cancellation,
+    // preventing Thread.interrupt() during Lucene search which could corrupt 
FSDirectory.
+    // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
+    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(
+        () -> executeSearchWithoutOptions(searchQuery, docIDCollector));
     try {
       return searchFuture.get();
     } catch (InterruptedException e) {
@@ -250,6 +218,48 @@ public class RealtimeLuceneTextIndex implements 
MutableTextIndex {
     }
   }
 
+  /**
+   * Executes Lucene search without options.
+   */
+  private MutableRoaringBitmap executeSearchWithoutOptions(String searchQuery,
+      RealtimeLuceneDocIdCollector docIDCollector) throws Exception {
+    IndexSearcher indexSearcher = null;
+    try {
+      // Lucene query parsers are generally stateful and a new instance must 
be created per query.
+      QueryParserBase parser = 
_queryParserClassConstructor.newInstance(_column, _analyzer);
+      if (_enablePrefixSuffixMatchingInPhraseQueries) {
+        // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
+        // the provided analyzer when wildcards are present, defaulting to the 
default analyzer for tokenization.
+        // Additionally, it does not support wildcards that span across terms.
+        // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
+        // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
+        parser.setAllowLeadingWildcard(true);
+      }
+      Query query = parser.parse(searchQuery);
+      if (_enablePrefixSuffixMatchingInPhraseQueries) {
+        // Note: Lucene's built-in QueryParser has limited wildcard 
functionality in phrase queries. It does not use
+        // the provided analyzer when wildcards are present, defaulting to the 
default analyzer for tokenization.
+        // Additionally, it does not support wildcards that span across terms.
+        // For more details, see: 
https://github.com/elastic/elasticsearch/issues/22540
+        // Workaround: Use a custom query parser that correctly implements 
wildcard searches.
+        query = LuceneTextIndexUtils.convertToMultiTermSpanQuery(query);
+      }
+      indexSearcher = _searcherManager.acquire();
+      indexSearcher.search(query, docIDCollector);
+      return getPinotDocIds(indexSearcher, docIDCollector.getDocIds());
+    } finally {
+      if (indexSearcher != null) {
+        try {
+          _searcherManager.release(indexSearcher);
+        } catch (Exception e) {
+          LOGGER.error(
+              "Failed while releasing the searcher manager for realtime text 
index for column {}, exception {}",
+              _column, e.getMessage());
+        }
+      }
+    }
+  }
+
   // TODO: Optimize this similar to how we have done for offline/completed 
segments.
   // Pre-built mapping will not work for realtime. We need to build an 
on-the-fly cache
   // as queries are coming in.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
index 9eb1126f4a0..edb8da4e2bd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexSearcherPool.java
@@ -20,6 +20,7 @@ package 
org.apache.pinot.segment.local.realtime.impl.invertedindex;
 
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.utils.ScalingThreadPoolExecutor;
+import org.apache.pinot.spi.query.QueryThreadContext;
 
 
 /**
@@ -27,13 +28,19 @@ import 
org.apache.pinot.common.utils.ScalingThreadPoolExecutor;
  * The pool max size is equivalent to 
pinot.query.scheduler.query_worker_threads to ensure each worker thread can have
  * an accompanying Lucene searcher thread if needed. init() is called in 
BaseServerStarter to avoid creating a
  * dependency on pinot-core.
+ *
+ * The executor is wrapped with 
QueryThreadContext.contextAwareExecutorService(executor, false) to propagate
+ * QueryThreadContext for CPU/memory tracking, but WITHOUT registering tasks 
for cancellation. This prevents
+ * Thread.interrupt() during Lucene search which could corrupt FSDirectory 
used by IndexWriter.
+ * See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
  */
 public class RealtimeLuceneTextIndexSearcherPool {
   private static RealtimeLuceneTextIndexSearcherPool _singletonInstance;
   private static ExecutorService _executorService;
 
   private RealtimeLuceneTextIndexSearcherPool(int size) {
-    _executorService = ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 
500);
+    ExecutorService baseExecutor = 
ScalingThreadPoolExecutor.newScalingThreadPool(0, size, 500);
+    _executorService = 
QueryThreadContext.contextAwareExecutorService(baseExecutor, false);
   }
 
   public static RealtimeLuceneTextIndexSearcherPool getInstance() {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java
index 53fa628a5a0..7987f266e42 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/vector/MutableVectorIndex.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.realtime.impl.vector;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.document.Document;
@@ -33,6 +34,7 @@ import org.apache.lucene.search.KnnFloatVectorQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TopDocs;
 import org.apache.lucene.store.FSDirectory;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndexSearcherPool;
 import 
org.apache.pinot.segment.local.segment.creator.impl.vector.XKnnFloatVectorField;
 import org.apache.pinot.segment.local.segment.store.VectorIndexUtils;
 import org.apache.pinot.segment.spi.V1Constants;
@@ -51,6 +53,8 @@ import org.slf4j.LoggerFactory;
  */
 public class MutableVectorIndex implements VectorIndexReader, MutableIndex {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MutableVectorIndex.class);
+  private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL =
+      RealtimeLuceneTextIndexSearcherPool.getInstance();
   public static final String VECTOR_INDEX_DOC_ID_COLUMN_NAME = "DocID";
   public static final long DEFAULT_COMMIT_INTERVAL_MS = 10_000L;
   public static final long DEFAULT_COMMIT_DOCS = 1000L;
@@ -125,16 +129,30 @@ public class MutableVectorIndex implements 
VectorIndexReader, MutableIndex {
 
   @Override
   public MutableRoaringBitmap getDocIds(float[] vector, int topK) {
-    MutableRoaringBitmap docIds;
+    // Search is executed in SEARCHER_POOL which is wrapped with 
contextAwareExecutorService(executor, false).
+    // This propagates QueryThreadContext for CPU/memory tracking without 
registering the task for cancellation,
+    // preventing Thread.interrupt() during Lucene search which could corrupt 
FSDirectory.
+    // See https://github.com/apache/lucene/issues/3315 and 
https://github.com/apache/lucene/issues/9309
+    Future<MutableRoaringBitmap> searchFuture = 
SEARCHER_POOL.getExecutorService().submit(
+        () -> executeVectorSearch(vector, topK));
     try {
-      IndexSearcher indexSearcher = new 
IndexSearcher(DirectoryReader.open(_indexDirectory));
-      Query query = new KnnFloatVectorQuery(_vectorColumn, vector, topK);
-      docIds = new MutableRoaringBitmap();
-      TopDocs search = indexSearcher.search(query, topK);
-      Arrays.stream(search.scoreDocs).map(scoreDoc -> 
scoreDoc.doc).forEach(docIds::add);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+      return searchFuture.get();
+    } catch (InterruptedException e) {
+      searchFuture.cancel(false);
+      throw new RuntimeException("VECTOR_SIMILARITY query interrupted for 
segment " + _segmentName
+          + " column " + _vectorColumn, e);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed while searching vector index for 
segment " + _segmentName
+          + " column " + _vectorColumn, e);
     }
+  }
+
+  private MutableRoaringBitmap executeVectorSearch(float[] vector, int topK) 
throws IOException {
+    IndexSearcher indexSearcher = new 
IndexSearcher(DirectoryReader.open(_indexDirectory));
+    Query query = new KnnFloatVectorQuery(_vectorColumn, vector, topK);
+    MutableRoaringBitmap docIds = new MutableRoaringBitmap();
+    TopDocs search = indexSearcher.search(query, topK);
+    Arrays.stream(search.scoreDocs).map(scoreDoc -> 
scoreDoc.doc).forEach(docIds::add);
     return docIds;
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java
index e5d487dd602..e27914d6da7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/text/LuceneDocIdCollector.java
@@ -20,10 +20,12 @@ package 
org.apache.pinot.segment.local.segment.index.readers.text;
 
 import java.io.IOException;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.CollectionTerminatedException;
 import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.Scorable;
 import org.apache.lucene.search.ScoreMode;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
@@ -60,6 +62,8 @@ public class LuceneDocIdCollector implements Collector {
   public LeafCollector getLeafCollector(LeafReaderContext context) {
     return new LeafCollector() {
 
+      private int _numDocsCollected = 0;
+
       @Override
       public void setScorer(Scorable scorer)
           throws IOException {
@@ -69,6 +73,21 @@ public class LuceneDocIdCollector implements Collector {
       @Override
       public void collect(int doc)
           throws IOException {
+        try {
+          QueryThreadContext.checkTerminationAndSampleUsagePeriodically(
+              _numDocsCollected++, "LuceneDocIdCollector");
+        } catch (RuntimeException e) {
+          // Why CollectionTerminatedException: Lucene's 
IndexSearcher.search() specially handles this exception
+          // to gracefully stop document collection without treating it as an 
error. If we let the original
+          // TerminationException propagate, Lucene would wrap it and log 
errors unnecessarily.
+          //
+          // How termination info is preserved: When 
checkTerminationAndSampleUsagePeriodically() throws
+          // TerminationException (for OOM/timeout), it's already stored in 
QueryExecutionContext._terminateException
+          // before being thrown. After search completes, higher-level code 
retrieves the actual error via
+          // QueryThreadContext.getTerminateException() to include proper 
error details in query response.
+          throw new CollectionTerminatedException();
+        }
+
         // Compute the absolute lucene docID across
         // sub-indexes because that's how the lookup table in docIdTranslator 
is built
         _docIds.add(_docIdTranslator.getPinotDocId(context.docBase + doc));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java
index 788e6f6dc8d..358f95af636 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/vector/HnswDocIdCollector.java
@@ -20,10 +20,12 @@ package 
org.apache.pinot.segment.local.segment.index.readers.vector;
 
 import java.io.IOException;
 import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.search.CollectionTerminatedException;
 import org.apache.lucene.search.Collector;
 import org.apache.lucene.search.LeafCollector;
 import org.apache.lucene.search.Scorable;
 import org.apache.lucene.search.ScoreMode;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
@@ -59,6 +61,7 @@ public class HnswDocIdCollector implements Collector {
   @Override
   public LeafCollector getLeafCollector(LeafReaderContext context) {
     return new LeafCollector() {
+      private int _numDocsCollected = 0;
 
       @Override
       public void setScorer(Scorable scorer)
@@ -69,6 +72,13 @@ public class HnswDocIdCollector implements Collector {
       @Override
       public void collect(int doc)
           throws IOException {
+        try {
+          QueryThreadContext.checkTerminationAndSampleUsagePeriodically(
+              _numDocsCollected++, "HnswDocIdCollector");
+        } catch (RuntimeException e) {
+          throw new CollectionTerminatedException();
+        }
+
         // Compute the absolute lucene docID across
         // sub-indexes because that's how the lookup table in docIdTranslator 
is built
         _docIds.add(_docIdTranslator.getPinotDocId(context.docBase + doc));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
index 4dd3de5e53c..4bc21d1506a 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/LuceneMutableTextIndexTest.java
@@ -32,11 +32,14 @@ import org.apache.lucene.queryparser.classic.QueryParser;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import 
org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder;
 import org.apache.pinot.segment.spi.index.TextIndexConfig;
+import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.util.TestUtils;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -52,12 +55,26 @@ public class LuceneMutableTextIndexTest {
   private static final RealtimeLuceneTextIndexSearcherPool SEARCHER_POOL =
       RealtimeLuceneTextIndexSearcherPool.init(1);
   private RealtimeLuceneTextIndex _realtimeLuceneTextIndex;
+  private QueryThreadContext _queryThreadContext;
 
   public LuceneMutableTextIndexTest() {
     RealtimeLuceneIndexRefreshManager.init(1, 10);
     ServerMetrics.register(mock(ServerMetrics.class));
   }
 
+  @BeforeMethod
+  public void setUpMethod() {
+    _queryThreadContext = QueryThreadContext.openForSseTest();
+  }
+
+  @AfterMethod
+  public void tearDownMethod() {
+    if (_queryThreadContext != null) {
+      _queryThreadContext.close();
+      _queryThreadContext = null;
+    }
+  }
+
   @Test
   public void testDefaultAnalyzerAndDefaultQueryParser() {
     // Test queries with standard analyzer with default configurations used by 
Pinot
@@ -259,9 +276,12 @@ public class LuceneMutableTextIndexTest {
   public void testQueryCancellationIsSuccessful()
       throws InterruptedException, ExecutionException {
     // Avoid early finalization by not using Executors.newSingleThreadExecutor 
(java <= 20, JDK-8145304)
-    ExecutorService executor = Executors.newFixedThreadPool(1);
+    ExecutorService baseExecutor = Executors.newFixedThreadPool(1);
+    // Wrap with contextAwareExecutorService to propagate QueryThreadContext 
to child threads
+    ExecutorService executor = 
QueryThreadContext.contextAwareExecutorService(baseExecutor);
     Future<MutableRoaringBitmap> res = executor.submit(() -> 
_realtimeLuceneTextIndex.getDocIds("/.*read.*/"));
-    executor.shutdownNow();
+    // Shutdown the base executor to trigger interrupt on the worker thread
+    baseExecutor.shutdownNow();
     res.get();
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexResourceTrackingTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexResourceTrackingTest.java
new file mode 100644
index 00000000000..145da5ea94a
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/RealtimeLuceneTextIndexResourceTrackingTest.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import 
org.apache.pinot.segment.local.segment.index.text.TextIndexConfigBuilder;
+import org.apache.pinot.segment.spi.index.TextIndexConfig;
+import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceTracker;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.util.TestUtils;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class RealtimeLuceneTextIndexResourceTrackingTest {
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
"RealtimeLuceneTextIndexResourceTrackingTest");
+  private static final String TEXT_COLUMN_NAME = "textColumn";
+  private static final AtomicInteger SEGMENT_COUNTER = new AtomicInteger(0);
+
+  private RealtimeLuceneTextIndex _textIndex;
+
+  static {
+    try {
+      RealtimeLuceneTextIndexSearcherPool.init(2);
+    } catch (AssertionError e) {
+      // Already initialized
+    }
+    RealtimeLuceneIndexRefreshManager.init(1, 10);
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
+  @BeforeClass
+  public void setUpClass() throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+    FileUtils.forceMkdir(INDEX_DIR);
+    RealtimeLuceneIndexRefreshManager.getInstance().reset();
+  }
+
+  @AfterClass
+  public void tearDownClass() throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  @BeforeMethod
+  public void setUp() {
+    TextIndexConfig config = new TextIndexConfigBuilder().build();
+    String segmentName = "segment_" + SEGMENT_COUNTER.getAndIncrement() + 
"__0__1__20240101T0000Z";
+    _textIndex = new RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, 
segmentName, config);
+
+    for (int i = 0; i < 100; i++) {
+      _textIndex.add("document number " + i + " with searchable content");
+    }
+    _textIndex.commit();
+
+    // Wait for index refresh
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return _textIndex.getSearcherManager().isSearcherCurrent();
+      } catch (IOException e) {
+        return false;
+      }
+    }, 5000, "Index refresh timeout");
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    if (_textIndex != null) {
+      _textIndex.close();
+      _textIndex = null;
+    }
+  }
+
+  @Test
+  public void testSearcherThreadRegisteredWithAccountant() throws Exception {
+    TrackingAccountant accountant = new TrackingAccountant();
+    QueryExecutionContext executionContext = 
QueryExecutionContext.forSseTest();
+
+    try (QueryThreadContext ignored = 
QueryThreadContext.open(executionContext, accountant)) {
+      MutableRoaringBitmap result = _textIndex.getDocIds("searchable");
+
+      assertTrue(result.getCardinality() > 0, "Search should return results");
+
+      assertTrue(accountant.getSetupTaskCount() >= 2,
+          "setupTask should be called for both parent and searcher threads, 
got: " + accountant.getSetupTaskCount());
+    }
+  }
+
+  @Test
+  public void testResourceUsageSampledDuringSearch() throws Exception {
+    TrackingAccountant accountant = new TrackingAccountant();
+    QueryExecutionContext executionContext = 
QueryExecutionContext.forSseTest();
+
+    try (QueryThreadContext ignored = 
QueryThreadContext.open(executionContext, accountant)) {
+      // Perform search with many matching docs to trigger periodic sampling
+      MutableRoaringBitmap result = _textIndex.getDocIds("document");
+
+      assertTrue(result.getCardinality() > 0, "Search should return results");
+    }
+
+    // After context close, clear should have been called
+    assertTrue(accountant.getClearCount() >= 1, "clear() should be called on 
context close");
+  }
+
+  @Test
+  public void testQueryTerminationDetectedByCollector() throws Exception {
+    TrackingAccountant accountant = new TrackingAccountant();
+    QueryExecutionContext executionContext = 
QueryExecutionContext.forSseTest();
+
+    // Add many more documents for longer search
+    TextIndexConfig config = new TextIndexConfigBuilder().build();
+    String segmentName = "segment_termination_" + 
SEGMENT_COUNTER.getAndIncrement() + "__0__1__20240101T0000Z";
+    RealtimeLuceneTextIndex largeIndex = new 
RealtimeLuceneTextIndex(TEXT_COLUMN_NAME, INDEX_DIR, segmentName, config);
+
+    try {
+      for (int i = 0; i < 50000; i++) {
+        largeIndex.add("document number " + i + " with lots of searchable 
content for termination test");
+      }
+      largeIndex.commit();
+
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          return largeIndex.getSearcherManager().isSearcherCurrent();
+        } catch (IOException e) {
+          return false;
+        }
+      }, 10000, "Index refresh timeout");
+
+      ExecutorService baseExecutor = Executors.newSingleThreadExecutor();
+      try (QueryThreadContext ignored = 
QueryThreadContext.open(executionContext, accountant)) {
+        // Wrap with contextAwareExecutorService but don't register for 
cancellation (false).
+        ExecutorService executor = 
QueryThreadContext.contextAwareExecutorService(baseExecutor, false);
+
+        // Start a search that will take some time
+        Future<MutableRoaringBitmap> searchFuture = executor.submit(
+            () -> largeIndex.getDocIds("/.*searchable.*/"));
+
+        // Give the search a moment to start
+        Thread.sleep(50);
+
+        // Terminate the query - this sets the termination flag that the 
collector will detect
+        
executionContext.terminate(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED, "OOM 
test");
+
+        try {
+          searchFuture.get();
+          // Search might complete before termination is detected - that's OK 
for this test
+        } catch (ExecutionException e) {
+          // Expected - search was terminated via collector detecting the 
termination flag
+          assertTrue(e.getCause() instanceof RuntimeException || e.getCause() 
instanceof Error,
+              "Should throw RuntimeException or Error on termination, got: " + 
e.getCause().getClass().getName());
+        } catch (CancellationException e) {
+          // Also acceptable if future was cancelled
+        }
+      } finally {
+        // Ensure executor is fully shut down before closing the index to 
prevent
+        baseExecutor.shutdown();
+        baseExecutor.awaitTermination(5, TimeUnit.SECONDS);
+      }
+    } finally {
+      largeIndex.close();
+    }
+  }
+
+  @Test(expectedExceptions = ExecutionException.class,
+      expectedExceptionsMessageRegExp = ".*TEXT_MATCH query interrupted.*")
+  public void testInterruptTriggersShouldCancel() throws Exception {
+    TrackingAccountant accountant = new TrackingAccountant();
+    QueryExecutionContext executionContext = 
QueryExecutionContext.forSseTest();
+
+    try (QueryThreadContext ignored = 
QueryThreadContext.open(executionContext, accountant)) {
+      ExecutorService baseExecutor = Executors.newFixedThreadPool(1);
+      ExecutorService executor = 
QueryThreadContext.contextAwareExecutorService(baseExecutor);
+      Future<MutableRoaringBitmap> future = executor.submit(() -> 
_textIndex.getDocIds("/.*content.*/"));
+      baseExecutor.shutdownNow();
+      future.get();
+    }
+  }
+
+  @Test
+  public void testConcurrentSearchesTrackedIndependently() throws Exception {
+    TrackingAccountant accountant = new TrackingAccountant();
+    QueryExecutionContext executionContext = 
QueryExecutionContext.forSseTest();
+    // Wrap with contextAwareExecutorService to propagate QueryThreadContext 
to child threads
+    ExecutorService baseExecutor = Executors.newFixedThreadPool(3);
+    ExecutorService executor = 
QueryThreadContext.contextAwareExecutorService(baseExecutor);
+
+    try (QueryThreadContext ignored = 
QueryThreadContext.open(executionContext, accountant)) {
+      Future<MutableRoaringBitmap> future1 = executor.submit(() -> 
_textIndex.getDocIds("document"));
+      Future<MutableRoaringBitmap> future2 = executor.submit(() -> 
_textIndex.getDocIds("number"));
+      Future<MutableRoaringBitmap> future3 = executor.submit(() -> 
_textIndex.getDocIds("searchable"));
+
+      MutableRoaringBitmap result1 = future1.get();
+      MutableRoaringBitmap result2 = future2.get();
+      MutableRoaringBitmap result3 = future3.get();
+
+      assertTrue(result1.getCardinality() > 0);
+      assertTrue(result2.getCardinality() > 0);
+      assertTrue(result3.getCardinality() > 0);
+
+      // Parent thread + 3 executor threads + their searcher threads = 
multiple setupTask calls
+      // Due to thread pool reuse and timing, we expect at least 4 (1 parent + 
3 searches)
+      assertTrue(accountant.getSetupTaskCount() >= 4,
+          "Multiple searches should register threads, got: " + 
accountant.getSetupTaskCount());
+    }
+
+    baseExecutor.shutdown();
+  }
+
+  @Test
+  public void testExecutionContextSharedWithSearcherThread() throws Exception {
+    TrackingAccountant accountant = new TrackingAccountant();
+    QueryExecutionContext executionContext = 
QueryExecutionContext.forSseTest();
+
+    try (QueryThreadContext ignored = 
QueryThreadContext.open(executionContext, accountant)) {
+      _textIndex.getDocIds("document");
+
+      // Verify all registered contexts share the same executionContext
+      for (QueryThreadContext ctx : accountant.getRegisteredContexts()) {
+        assertEquals(ctx.getExecutionContext(), executionContext,
+            "All thread contexts should share the same execution context");
+      }
+    }
+  }
+
+  /**
+   * A ThreadAccountant implementation that tracks calls for testing purposes.
+   */
+  private static class TrackingAccountant implements ThreadAccountant {
+    private final AtomicInteger _setupTaskCount = new AtomicInteger(0);
+    private final AtomicInteger _sampleUsageCount = new AtomicInteger(0);
+    private final AtomicInteger _clearCount = new AtomicInteger(0);
+    private final ConcurrentHashMap<Thread, QueryThreadContext> 
_registeredContexts = new ConcurrentHashMap<>();
+
+    @Override
+    public void setupTask(QueryThreadContext threadContext) {
+      _setupTaskCount.incrementAndGet();
+      _registeredContexts.put(Thread.currentThread(), threadContext);
+    }
+
+    @Override
+    public void sampleUsage() {
+      _sampleUsageCount.incrementAndGet();
+    }
+
+    @Override
+    public void clear() {
+      _clearCount.incrementAndGet();
+      _registeredContexts.remove(Thread.currentThread());
+    }
+
+    @Override
+    public void updateUntrackedResourceUsage(String identifier, long 
cpuTimeNs, long allocatedBytes,
+        org.apache.pinot.spi.accounting.TrackingScope trackingScope) {
+    }
+
+    @Override
+    public Collection<? extends ThreadResourceTracker> getThreadResources() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public Map<String, ? extends QueryResourceTracker> getQueryResources() {
+      return Collections.emptyMap();
+    }
+
+    public int getSetupTaskCount() {
+      return _setupTaskCount.get();
+    }
+
+    public int getSampleUsageCount() {
+      return _sampleUsageCount.get();
+    }
+
+    public int getClearCount() {
+      return _clearCount.get();
+    }
+
+    public Collection<QueryThreadContext> getRegisteredContexts() {
+      return _registeredContexts.values();
+    }
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
index 2feaa2fbee2..ef3823578fe 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.accounting.ThreadAccountant;
@@ -178,9 +180,22 @@ public class QueryThreadContext implements AutoCloseable {
   }
 
   /// Returns a new [ExecutorService] whose tasks will be executed with the 
[QueryThreadContext] initialized with the
-  /// state of the thread submitting the tasks.
+  /// state of the thread submitting the tasks. Tasks are registered for 
cancellation when the query is terminated.
   public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
-    return new DecoratorExecutorService(executorService, future -> 
get().getExecutionContext().addTask(future)) {
+    return contextAwareExecutorService(executorService, true);
+  }
+
+  /// Returns a new [ExecutorService] whose tasks will be executed with the 
[QueryThreadContext] initialized with the
+  /// state of the thread submitting the tasks.
+  /// @param registerTaskForCancellation If true, tasks are registered with 
QueryExecutionContext.addTask() and will
+  ///                                    be cancelled (via Thread.interrupt()) 
when the query is terminated. Set to
+  ///                                    false for executors where interrupt 
could cause corruption
+  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService,
+      boolean registerTaskForCancellation) {
+    Consumer<Future<?>> onSubmit = registerTaskForCancellation
+        ? future -> get().getExecutionContext().addTask(future)
+        : null;
+    return new DecoratorExecutorService(executorService, onSubmit) {
       @Override
       protected <T> Callable<T> decorate(Callable<T> task) {
         QueryThreadContext parentThreadContext = get();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to