This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6984719f24 Implement Mutable FST Index (#8861)
6984719f24 is described below

commit 6984719f24ecc6f7ee882a2c0f9012cc57037640
Author: Atri Sharma <[email protected]>
AuthorDate: Thu Jun 9 14:04:57 2022 +0530

    Implement Mutable FST Index (#8861)
    
    This PR implements a real time FST index, allowing fields containing 
tokenized documents to be directly stored in the FST index and queried in real 
time, with no flush requirement.
    
    This can be enabled by specifying the following property in the properties 
section of the FST index, being created on a real time table:
    "properties":[{"fstType":"native"}]
---
 .../indexsegment/mutable/MutableSegmentImpl.java   | 35 +++++++--
 .../impl/invertedindex/NativeMutableFSTIndex.java  | 80 ++++++++++++++++++++
 .../index/column/IntermediateIndexContainer.java   |  2 +-
 .../index/datasource/MutableDataSource.java        |  7 +-
 .../NativeAndLuceneMutableTextIndexTest.java       |  4 -
 .../invertedindex/NativeMutableFSTIndexTest.java   | 88 ++++++++++++++++++++++
 6 files changed, 202 insertions(+), 14 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index fabb501280..1f17cd682e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -50,6 +50,7 @@ import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
 import 
org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import 
org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary;
 import org.apache.pinot.segment.local.realtime.impl.geospatial.MutableH3Index;
+import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableFSTIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.NativeMutableTextIndex;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneIndexRefreshState;
 import 
org.apache.pinot.segment.local.realtime.impl.invertedindex.RealtimeLuceneTextIndex;
@@ -242,7 +243,6 @@ public class MutableSegmentImpl implements MutableSegment {
     Set<String> noDictionaryColumns = config.getNoDictionaryColumns();
     Set<String> invertedIndexColumns = config.getInvertedIndexColumns();
     Set<String> textIndexColumns = config.getTextIndexColumns();
-    // TODO: Add mutable FST and wire it up
     Set<String> fstIndexColumns = config.getFSTIndexColumns();
     Set<String> jsonIndexColumns = config.getJsonIndexColumns();
     Map<String, H3IndexConfig> h3IndexConfigs = config.getH3IndexConfigs();
@@ -306,6 +306,19 @@ public class MutableSegmentImpl implements MutableSegment {
       MutableInvertedIndex invertedIndexReader =
           invertedIndexColumns.contains(column) ? 
indexProvider.newInvertedIndex(context.forInvertedIndex()) : null;
 
+      MutableTextIndex fstIndex = null;
+      // FST Index
+      if (_fieldConfigList != null && fstIndexColumns.contains(column)) {
+        for (FieldConfig fieldConfig : _fieldConfigList) {
+          if (fieldConfig.getName().equals(column)) {
+            Map<String, String> properties = fieldConfig.getProperties();
+            if (TextIndexUtils.isFstTypeNative(properties)) {
+              fstIndex = new NativeMutableFSTIndex(column);
+            }
+          }
+        }
+      }
+
       // Text index
       MutableTextIndex textIndex;
       if (textIndexColumns.contains(column)) {
@@ -365,7 +378,7 @@ public class MutableSegmentImpl implements MutableSegment {
       // TODO: Support range index and bloom filter for mutable segment
       _indexContainerMap.put(column,
           new IndexContainer(fieldSpec, partitionFunction, partitions, new 
NumValuesInfo(), forwardIndex, dictionary,
-              invertedIndexReader, null, textIndex, jsonIndex, h3Index, null, 
nullValueVector, sourceColumn,
+              invertedIndexReader, null, textIndex, fstIndex, jsonIndex, 
h3Index, null, nullValueVector, sourceColumn,
               valueAggregator));
     }
 
@@ -1293,6 +1306,7 @@ public class MutableSegmentImpl implements MutableSegment 
{
     final RangeIndexReader _rangeIndex;
     final MutableH3Index _h3Index;
     final MutableTextIndex _textIndex;
+    final MutableTextIndex _fstIndex;
     final MutableJsonIndex _jsonIndex;
     final BloomFilterReader _bloomFilter;
     final MutableNullValueVector _nullValueVector;
@@ -1310,9 +1324,9 @@ public class MutableSegmentImpl implements MutableSegment 
{
         @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, 
MutableForwardIndex forwardIndex,
         @Nullable MutableDictionary dictionary, @Nullable MutableInvertedIndex 
invertedIndex,
         @Nullable RangeIndexReader rangeIndex, @Nullable MutableTextIndex 
textIndex,
-        @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index 
h3Index, @Nullable BloomFilterReader bloomFilter,
-        @Nullable MutableNullValueVector nullValueVector, @Nullable String 
sourceColumn,
-        @Nullable ValueAggregator valueAggregator) {
+        @Nullable MutableTextIndex fstIndex, @Nullable MutableJsonIndex 
jsonIndex, @Nullable MutableH3Index h3Index,
+        @Nullable BloomFilterReader bloomFilter, @Nullable 
MutableNullValueVector nullValueVector,
+        @Nullable String sourceColumn, @Nullable ValueAggregator 
valueAggregator) {
       _fieldSpec = fieldSpec;
       _partitionFunction = partitionFunction;
       _partitions = partitions;
@@ -1324,6 +1338,7 @@ public class MutableSegmentImpl implements MutableSegment 
{
       _h3Index = h3Index;
 
       _textIndex = textIndex;
+      _fstIndex = fstIndex;
       _jsonIndex = jsonIndex;
       _bloomFilter = bloomFilter;
       _nullValueVector = nullValueVector;
@@ -1334,7 +1349,8 @@ public class MutableSegmentImpl implements MutableSegment 
{
     DataSource toDataSource() {
       return new MutableDataSource(_fieldSpec, _numDocsIndexed, 
_numValuesInfo._numValues,
           _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, 
_partitions, _minValue, _maxValue, _forwardIndex,
-          _dictionary, _invertedIndex, _rangeIndex, _textIndex, _jsonIndex, 
_h3Index, _bloomFilter, _nullValueVector);
+          _dictionary, _invertedIndex, _rangeIndex, _textIndex, _fstIndex, 
_jsonIndex, _h3Index, _bloomFilter,
+          _nullValueVector);
     }
 
     @Override
@@ -1374,6 +1390,13 @@ public class MutableSegmentImpl implements 
MutableSegment {
           _logger.error("Caught exception while closing text index for column: 
{}, continuing with error", column, e);
         }
       }
+      if (_fstIndex != null) {
+        try {
+          _fstIndex.close();
+        } catch (Exception e) {
+          _logger.error("Caught exception while closing fst index for column: 
{}, continuing with error", column, e);
+        }
+      }
       if (_jsonIndex != null) {
         try {
           _jsonIndex.close();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableFSTIndex.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableFSTIndex.java
new file mode 100644
index 0000000000..bea06abbde
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableFSTIndex.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFST;
+import 
org.apache.pinot.segment.local.utils.nativefst.mutablefst.MutableFSTImpl;
+import 
org.apache.pinot.segment.local.utils.nativefst.utils.RealTimeRegexpMatcher;
+import org.apache.pinot.segment.spi.index.mutable.MutableTextIndex;
+import org.roaringbitmap.RoaringBitmapWriter;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class NativeMutableFSTIndex implements MutableTextIndex {
+  private final String _column;
+  private final MutableFST _fst;
+  private final ReentrantReadWriteLock.ReadLock _readLock;
+  private final ReentrantReadWriteLock.WriteLock _writeLock;
+  private int _dictId;
+
+  public NativeMutableFSTIndex(String column) {
+    _column = column;
+    _fst = new MutableFSTImpl();
+
+    ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    _readLock = readWriteLock.readLock();
+    _writeLock = readWriteLock.writeLock();
+  }
+
+  @Override
+  public void add(String document) {
+    _writeLock.lock();
+    try {
+      _fst.addPath(document, _dictId);
+      _dictId++;
+    } finally {
+      _writeLock.unlock();
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap getDictIds(String searchQuery) {
+    RoaringBitmapWriter<MutableRoaringBitmap> writer = 
RoaringBitmapWriter.bufferWriter().get();
+    _readLock.lock();
+    try {
+      RealTimeRegexpMatcher.regexMatch(searchQuery, _fst, writer::add);
+      return writer.get();
+    } finally {
+      _readLock.unlock();
+    }
+  }
+
+  @Override
+  public MutableRoaringBitmap getDocIds(String searchQuery) {
+    throw new UnsupportedOperationException("getDocIds is not supported for 
NativeMutableFSTIndex");
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
index d1893eee06..3e90494adb 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/IntermediateIndexContainer.java
@@ -63,7 +63,7 @@ public class IntermediateIndexContainer implements Closeable {
   public DataSource toDataSource(int numDocsIndexed) {
     return new MutableDataSource(_fieldSpec, numDocsIndexed, 
_numValuesInfo._numValues,
         _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, 
_partitions, _minValue, _maxValue, _forwardIndex,
-        _dictionary, null, null, null, null, null, null, null);
+        _dictionary, null, null, null, null, null, null, null, null);
   }
 
   @Override
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
index 1d22e12c58..0627212577 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
@@ -44,10 +44,11 @@ public class MutableDataSource extends BaseDataSource {
       @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> 
partitions, @Nullable Comparable minValue,
       @Nullable Comparable maxValue, ForwardIndexReader forwardIndex, 
@Nullable Dictionary dictionary,
       @Nullable InvertedIndexReader invertedIndex, @Nullable RangeIndexReader 
rangeIndex,
-      @Nullable TextIndexReader textIndex, @Nullable JsonIndexReader 
jsonIndex, @Nullable H3IndexReader h3Index,
-      @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader 
nullValueVector) {
+      @Nullable TextIndexReader textIndex, @Nullable TextIndexReader fstIndex, 
@Nullable JsonIndexReader jsonIndex,
+      @Nullable H3IndexReader h3Index, @Nullable BloomFilterReader bloomFilter,
+      @Nullable NullValueVectorReader nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, 
maxNumValuesPerMVEntry, partitionFunction,
-            partitions, minValue, maxValue), forwardIndex, dictionary, 
invertedIndex, rangeIndex, textIndex, null,
+            partitions, minValue, maxValue), forwardIndex, dictionary, 
invertedIndex, rangeIndex, textIndex, fstIndex,
         jsonIndex, h3Index, bloomFilter, nullValueVector);
   }
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
index b79482a1f0..094ffff809 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeAndLuceneMutableTextIndexTest.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.lucene.search.SearcherManager;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -99,9 +98,6 @@ public class NativeAndLuceneMutableTextIndexTest {
   }
 
   private void testSelectionResults(String nativeQuery, String luceneQuery) {
-    MutableRoaringBitmap resultset = 
_realtimeLuceneTextIndex.getDocIds(luceneQuery);
-    MutableRoaringBitmap resultset2 = 
_nativeMutableTextIndex.getDocIds(nativeQuery);
-
     assertEquals(_nativeMutableTextIndex.getDocIds(nativeQuery), 
_realtimeLuceneTextIndex.getDocIds(luceneQuery));
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableFSTIndexTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableFSTIndexTest.java
new file mode 100644
index 0000000000..0846aeb3db
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/invertedindex/NativeMutableFSTIndexTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.realtime.impl.invertedindex;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class NativeMutableFSTIndexTest {
+  private static final String TEXT_COLUMN_NAME = "testColumnName";
+  private NativeMutableFSTIndex _nativeMutableFSTIndex;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    _nativeMutableFSTIndex = new NativeMutableFSTIndex(TEXT_COLUMN_NAME);
+    List<String> documents = getTextData();
+
+    for (String doc : documents) {
+      _nativeMutableFSTIndex.add(doc);
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _nativeMutableFSTIndex.close();
+  }
+
+  @Test
+  public void testQueries() {
+    String nativeQuery = "P.*";
+    List<Integer> resultList = Arrays.asList(0, 9);
+    testSelectionResults(nativeQuery, 2, resultList);
+
+    nativeQuery = "a.*";
+    resultList = Arrays.asList(5, 6);
+    testSelectionResults(nativeQuery, 2, resultList);
+
+    nativeQuery = ".*ed";
+    resultList = Arrays.asList(6);
+    testSelectionResults(nativeQuery, 1, resultList);
+
+    nativeQuery = ".*m.*";
+    resultList = Arrays.asList(6, 7, 8);
+    testSelectionResults(nativeQuery, 3, resultList);
+  }
+
+  private List<String> getTextData() {
+    return Arrays.asList("Prince", "Andrew", "kept", "looking", "with", "an", 
"amused", "smile", "from", "Pierre");
+  }
+
+  private void testSelectionResults(String nativeQuery, int resultCount, 
@Nullable List<Integer> results) {
+    ImmutableRoaringBitmap resultMap = 
_nativeMutableFSTIndex.getDictIds(nativeQuery);
+    assertEquals(resultMap.getCardinality(), resultCount);
+
+    if (results != null) {
+      for (int result : results) {
+        assertTrue(resultMap.contains(result));
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to