This is an automated email from the ASF dual-hosted git repository.

saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new caacf84971 Change bitmap inverted index to be able to be embedded in a 
buffer. (#10370)
caacf84971 is described below

commit caacf849717fe97e4b655dc9bf885eb4eb410b21
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Apr 3 13:05:01 2023 +0200

    Change bitmap inverted index to be able to be embedded in a buffer. (#10370)
    
    * Change bitmap inverted index to be able to be embedded in a buffer.
    
    * BitmapInvertedIndexWriter will always truncate the file, even if it 
doesn't own the channel
    
    * remove changes in BitmapInvertedIndexReader as they are not actually 
needed
    
    * Replace some new File with getDefaultFile
    
    * Add javadoc to constructors^
    
    * Change _bytesWritten to _currentBufferPosition
---
 .../impl/inv/BitmapInvertedIndexWriter.java        | 47 ++++++++---
 .../inv/OffHeapBitmapInvertedIndexCreator.java     | 91 +++++++++++++++++++---
 2 files changed, 117 insertions(+), 21 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitmapInvertedIndexWriter.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitmapInvertedIndexWriter.java
index 2e13a65c7e..c5a64ad68e 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitmapInvertedIndexWriter.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitmapInvertedIndexWriter.java
@@ -56,15 +56,34 @@ public final class BitmapInvertedIndexWriter implements 
Closeable {
   private final FileChannel _fileChannel;
   private final ByteBuffer _offsetBuffer;
   private ByteBuffer _bitmapBuffer;
-  private long _bytesWritten;
+  private long _currentBufferPosition;
+  private final boolean _ownsChannel;
 
   public BitmapInvertedIndexWriter(File outputFile, int numBitmaps)
       throws IOException {
+    this(new RandomAccessFile(outputFile, "rw").getChannel(), numBitmaps, 
true);
+  }
+
+  /**
+   * Creates a new writer that uses the given {@link FileChannel}.
+   * It will start to write on the current position of the channel assuming it 
is the last useful byte in the file.
+   * When this object is {@link #close() closed}, the channel is truncated to 
the last byte written by this writer.
+   * @param fileChannel the file channel to be used
+   * @param numBitmaps the number of bitmaps that are expected. The actual 
value cannot be higher than this value. Fewer
+   *                   bitmaps than the given value can be used, but in that 
case the representation will not be as
+   *                   expected.
+   * @param ownsChannel whether this writer owns the channel or not. If the 
channel is owned then it will be closed when
+   *                    this object is closed. Otherwise the owner will have 
to close it by itself. Even if this writer
+   *                    does not own the channel, it will be truncated when 
the writer is closed.
+   */
+  public BitmapInvertedIndexWriter(FileChannel fileChannel, int numBitmaps, 
boolean ownsChannel)
+      throws IOException {
+    _ownsChannel = ownsChannel;
     int sizeForOffsets = (numBitmaps + 1) * Integer.BYTES;
     long bitmapBufferEstimate = Math.min(PESSIMISTIC_BITMAP_SIZE_ESTIMATE * 
numBitmaps, MAX_INITIAL_BUFFER_SIZE);
-    _fileChannel = new RandomAccessFile(outputFile, "rw").getChannel();
-    _offsetBuffer = _fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 
sizeForOffsets);
-    _bytesWritten = sizeForOffsets;
+    _fileChannel = fileChannel;
+    _offsetBuffer = _fileChannel.map(FileChannel.MapMode.READ_WRITE, 
_fileChannel.position(), sizeForOffsets);
+    _currentBufferPosition = sizeForOffsets + _fileChannel.position();
     mapBitmapBuffer(bitmapBufferEstimate);
   }
 
@@ -72,9 +91,9 @@ public final class BitmapInvertedIndexWriter implements 
Closeable {
       throws IOException {
     int length = bitmap.serializedSizeInBytes();
     resizeIfNecessary(length);
-    _offsetBuffer.putInt(asUnsignedInt(_bytesWritten));
+    _offsetBuffer.putInt(asUnsignedInt(_currentBufferPosition));
     bitmap.serialize(_bitmapBuffer);
-    _bytesWritten += length;
+    _currentBufferPosition += length;
   }
 
   public void add(byte[] bitmapBytes)
@@ -85,9 +104,9 @@ public final class BitmapInvertedIndexWriter implements 
Closeable {
   public void add(byte[] bitmapBytes, int length)
       throws IOException {
     resizeIfNecessary(length);
-    _offsetBuffer.putInt(asUnsignedInt(_bytesWritten));
+    _offsetBuffer.putInt(asUnsignedInt(_currentBufferPosition));
     _bitmapBuffer.put(bitmapBytes, 0, length);
-    _bytesWritten += length;
+    _currentBufferPosition += length;
   }
 
   private void resizeIfNecessary(int required)
@@ -100,7 +119,7 @@ public final class BitmapInvertedIndexWriter implements 
Closeable {
   private void mapBitmapBuffer(long size)
       throws IOException {
     cleanBitmapBuffer();
-    _bitmapBuffer = _fileChannel.map(FileChannel.MapMode.READ_WRITE, 
_bytesWritten, size)
+    _bitmapBuffer = _fileChannel.map(FileChannel.MapMode.READ_WRITE, 
_currentBufferPosition, size)
         .order(ByteOrder.LITTLE_ENDIAN);
   }
 
@@ -111,13 +130,19 @@ public final class BitmapInvertedIndexWriter implements 
Closeable {
     }
   }
 
+  public long getLastWrittenPosition() {
+    return _currentBufferPosition;
+  }
+
   @Override
   public void close()
       throws IOException {
-    long fileLength = _bytesWritten;
+    long fileLength = _currentBufferPosition;
     _offsetBuffer.putInt(asUnsignedInt(fileLength));
     _fileChannel.truncate(fileLength);
-    _fileChannel.close();
+    if (_ownsChannel) {
+      _fileChannel.close();
+    }
     if (CleanerUtil.UNMAP_SUPPORTED) {
       CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner();
       cleaner.freeBuffer(_offsetBuffer);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
index 0230485026..4dc1cc6316 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
@@ -18,9 +18,13 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.inv;
 
+import it.unimi.dsi.fastutil.ints.IntIterator;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.file.attribute.FileAttribute;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.spi.V1Constants;
 import 
org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
@@ -80,16 +84,52 @@ public final class OffHeapBitmapInvertedIndexCreator 
implements DictionaryBasedI
   private PinotDataBuffer _invertedIndexValueBuffer;
   private PinotDataBuffer _invertedIndexLengthBuffer;
 
+  /**
+   * Like calling {@link #OffHeapBitmapInvertedIndexCreator(File, FieldSpec, 
int, int, int, String)} with the default
+   * {@link V1Constants.Indexes#BITMAP_INVERTED_INDEX_FILE_EXTENSION}.
+   *
+   * @see #OffHeapBitmapInvertedIndexCreator(File, FieldSpec, int, int, int, 
String)
+   */
   public OffHeapBitmapInvertedIndexCreator(File indexDir, FieldSpec fieldSpec, 
int cardinality, int numDocs,
       int numValues)
       throws IOException {
-    String columnName = fieldSpec.getName();
-    _invertedIndexFile = new File(indexDir, columnName + 
V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
-    _forwardIndexValueBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_VALUE_BUFFER_SUFFIX);
-    _forwardIndexLengthBufferFile = new File(indexDir, columnName + 
FORWARD_INDEX_LENGTH_BUFFER_SUFFIX);
-    _invertedIndexValueBufferFile = new File(indexDir, columnName + 
INVERTED_INDEX_VALUE_BUFFER_SUFFIX);
-    _invertedIndexLengthBufferFile = new File(indexDir, columnName + 
INVERTED_INDEX_LENGTH_BUFFER_SUFFIX);
-    _singleValue = fieldSpec.isSingleValueField();
+    this(indexDir, fieldSpec, cardinality, numDocs, numValues,
+        V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
+  }
+
+  /**
+   * Like calling {@link #OffHeapBitmapInvertedIndexCreator(File, String, 
boolean, int, int, int, String)} with
+   * the column name and single value specified by the given {@link FieldSpec}.
+   *
+   * @see #OffHeapBitmapInvertedIndexCreator(File, String, boolean, int, int, 
int, String)
+   */
+  public OffHeapBitmapInvertedIndexCreator(File indexDir, FieldSpec fieldSpec, 
int cardinality, int numDocs,
+      int numValues, String extension)
+      throws IOException {
+    this(indexDir, fieldSpec.getName(), fieldSpec.isSingleValueField(), 
cardinality, numDocs, numValues, extension);
+  }
+
+  /**
+   * @param indexDir The directory where the index will be created.
+   * @param columnName The name of the column being indexed.
+   * @param singleValue True iff the column is single value.
+   * @param cardinality How many different values the column has.
+   * @param numDocs How many documents are expected.
+   * @param numValues How many values the index will have. This should be 
equal to numDocs in single value columns, but
+   *                  may be higher in multivalued columns.
+   * @param extension The suffix added to the file. Although is called 
extension, it behaves like
+   * {@link java.nio.file.Files#createTempFile(String, String, 
FileAttribute[])} suffix parameter.
+   */
+  public OffHeapBitmapInvertedIndexCreator(File indexDir, String columnName, 
boolean singleValue, int cardinality,
+      int numDocs, int numValues, String extension)
+      throws IOException {
+    String ext = 
extension.equals(V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION) ? "" 
: "." + extension;
+    _invertedIndexFile = getDefaultFile(indexDir, columnName, extension);
+    _forwardIndexValueBufferFile = getDefaultFile(indexDir, columnName, ext + 
FORWARD_INDEX_VALUE_BUFFER_SUFFIX);
+    _forwardIndexLengthBufferFile = getDefaultFile(indexDir, columnName, ext + 
FORWARD_INDEX_LENGTH_BUFFER_SUFFIX);
+    _invertedIndexValueBufferFile = getDefaultFile(indexDir, columnName, ext + 
INVERTED_INDEX_VALUE_BUFFER_SUFFIX);
+    _invertedIndexLengthBufferFile = getDefaultFile(indexDir, columnName, ext 
+ INVERTED_INDEX_LENGTH_BUFFER_SUFFIX);
+    _singleValue = singleValue;
     _cardinality = cardinality;
     _numDocs = numDocs;
     _numValues = _singleValue ? numDocs : numValues;
@@ -116,12 +156,27 @@ public final class OffHeapBitmapInvertedIndexCreator 
implements DictionaryBasedI
     }
   }
 
+  public static File getDefaultFile(File indexDir, String columnName, String 
extension) {
+    return new File(indexDir, columnName + extension);
+  }
+
   @Override
   public void add(int dictId) {
     putInt(_forwardIndexValueBuffer, _nextDocId++, dictId);
     putInt(_invertedIndexLengthBuffer, dictId, 
getInt(_invertedIndexLengthBuffer, dictId) + 1);
   }
 
+  public void add(IntIterator dictIds) {
+    int added = 0;
+    while (dictIds.hasNext()) {
+      int dictId = dictIds.nextInt();
+      putInt(_forwardIndexValueBuffer, _nextValueId++, dictId);
+      putInt(_invertedIndexLengthBuffer, dictId, 
getInt(_invertedIndexLengthBuffer, dictId) + 1);
+      added++;
+    }
+    putInt(_forwardIndexLengthBuffer, _nextDocId++, added);
+  }
+
   @Override
   public void add(int[] dictIds, int length) {
     for (int i = 0; i < length; i++) {
@@ -132,8 +187,7 @@ public final class OffHeapBitmapInvertedIndexCreator 
implements DictionaryBasedI
     putInt(_forwardIndexLengthBuffer, _nextDocId++, length);
   }
 
-  @Override
-  public void seal()
+  private void invert()
       throws IOException {
     // Calculate value index for each dictId in the inverted index value buffer
     // Re-use inverted index length buffer to store the value index for each 
dictId, where value index is the index in
@@ -176,9 +230,12 @@ public final class OffHeapBitmapInvertedIndexCreator 
implements DictionaryBasedI
       destroyBuffer(_forwardIndexLengthBuffer, _forwardIndexLengthBufferFile);
       _forwardIndexLengthBuffer = null;
     }
+  }
 
+  private void write(FileChannel channel)
+      throws IOException {
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (BitmapInvertedIndexWriter writer = new 
BitmapInvertedIndexWriter(_invertedIndexFile, _cardinality)) {
+    try (BitmapInvertedIndexWriter writer = new 
BitmapInvertedIndexWriter(channel, _cardinality, false)) {
       RoaringBitmapWriter<RoaringBitmap> bitmapWriter = 
RoaringBitmapWriter.writer().get();
       int startIndex = 0;
       for (int dictId = 0; dictId < _cardinality; dictId++) {
@@ -193,6 +250,20 @@ public final class OffHeapBitmapInvertedIndexCreator 
implements DictionaryBasedI
     }
   }
 
+  @Override
+  public void seal()
+      throws IOException {
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      seal(channel);
+    }
+  }
+
+  public void seal(FileChannel channel)
+      throws IOException {
+    invert();
+    write(channel);
+  }
+
   @Override
   public void close()
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to