Jackie-Jiang commented on a change in pull request #6320:
URL: https://github.com/apache/incubator-pinot/pull/6320#discussion_r536887337



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -19,18 +19,25 @@
 package org.apache.pinot.core.segment.creator.impl.inv;
 
 import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
+
 import java.io.Closeable;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.util.CleanerUtil;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.roaringbitmap.RoaringBitmap;
+import org.roaringbitmap.RoaringBitmapWriter;
+
+import static java.lang.Integer.reverseBytes;

Review comment:
       Don't use static import in production class

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new 
FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new 
BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];

Review comment:
       For off-heap creator, we don't want to keep all bitmaps on heap. We 
should try to create and serialize the bitmaps one by one. Not sure about the 
cost if we map a buffer per bitmap

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new 
FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new 
BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
bitmapOffset).order(LITTLE_ENDIAN);

Review comment:
       The offset buffer does not have to be LE, as all the values are written 
as BE

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new 
FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new 
BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;

Review comment:
       (nit) we don't usually use final for local variable

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new 
FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new 
BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];

Review comment:
       Do `RoaringBitmap` and `MutableRoaringBitmap` serialize to the same 
bytes?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new 
FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new 
BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];
+      RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer()
+              .initialCapacity(((_nextDocId - 1) >>> 16) / _cardinality).get();

Review comment:
       Why do you need to divide it with `_cardinality`? Though this should be 
`.expectedRange(0, _nextDocId)`?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
##########
@@ -181,33 +188,51 @@ public void seal()
     }
 
     // Create bitmaps from inverted index buffers and serialize them to file
-    try (DataOutputStream offsetDataStream = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)));
-        FileOutputStream bitmapFileStream = new 
FileOutputStream(_invertedIndexFile);
-        DataOutputStream bitmapDataStream = new DataOutputStream(new 
BufferedOutputStream(bitmapFileStream))) {
-      int bitmapOffset = (_cardinality + 1) * Integer.BYTES;
-      offsetDataStream.writeInt(bitmapOffset);
-      bitmapFileStream.getChannel().position(bitmapOffset);
-
+    ByteBuffer offsetBuffer = null;
+    ByteBuffer bitmapBuffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      // map the offsets buffer
+      final int startOfBitmaps = (_cardinality + 1) * Integer.BYTES;
+      int bitmapOffset = startOfBitmaps;
+      offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
bitmapOffset).order(LITTLE_ENDIAN);
+      offsetBuffer.putInt(reverseBytes(bitmapOffset));
+      RoaringBitmap[] bitmaps = new RoaringBitmap[_cardinality];
+      RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer()
+              .initialCapacity(((_nextDocId - 1) >>> 16) / _cardinality).get();
       int startIndex = 0;
       for (int dictId = 0; dictId < _cardinality; dictId++) {
-        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
         int endIndex = getInt(_invertedIndexLengthBuffer, dictId);
         for (int i = startIndex; i < endIndex; i++) {
-          bitmap.add(getInt(_invertedIndexValueBuffer, i));
+          writer.add(getInt(_invertedIndexValueBuffer, i));
         }
-        startIndex = endIndex;
-
-        // Write offset and bitmap into file
-        bitmapOffset += bitmap.serializedSizeInBytes();
+        bitmaps[dictId] = writer.get();
+        writer.reset();
+        int serializedSize = bitmaps[dictId].serializedSizeInBytes();
+        bitmapOffset += serializedSize;
         // Check for int overflow
         Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s 
exceeds 2GB limit", _invertedIndexFile);
-        offsetDataStream.writeInt(bitmapOffset);
-        bitmap.serialize(bitmapDataStream);
+        // write offset into file
+        offsetBuffer.putInt(reverseBytes(bitmapOffset));
+        startIndex = endIndex;
+      }
+      // we know how long the file should be now, so can map it
+      bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 
startOfBitmaps, bitmapOffset - startOfBitmaps);
+      for (RoaringBitmap bitmap : bitmaps) {
+        bitmap.serialize(bitmapBuffer);
       }
     } catch (Exception e) {
       FileUtils.deleteQuietly(_invertedIndexFile);
       throw e;
+    } finally {
+      if (CleanerUtil.UNMAP_SUPPORTED) {
+        CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner();
+        if (null != offsetBuffer) {

Review comment:
       (nit) same for other places
   ```suggestion
           if (offsetBuffer != null) {
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
##########
@@ -19,68 +19,85 @@
 package org.apache.pinot.core.segment.creator.impl.inv;
 
 import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
+
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.util.CleanerUtil;
+import org.roaringbitmap.RoaringBitmapWriter;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
 
 /**
  * Implementation of {@link DictionaryBasedInvertedIndexCreator} that uses 
on-heap memory.
  */
 public final class OnHeapBitmapInvertedIndexCreator implements 
DictionaryBasedInvertedIndexCreator {
   private final File _invertedIndexFile;
-  private final MutableRoaringBitmap[] _bitmaps;
+  private final RoaringBitmapWriter<MutableRoaringBitmap>[] _bitmapWriters;
   private int _nextDocId;
 
+  @SuppressWarnings("unchecked")
   public OnHeapBitmapInvertedIndexCreator(File indexDir, String columnName, 
int cardinality) {
     _invertedIndexFile = new File(indexDir, columnName + 
V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
-    _bitmaps = new MutableRoaringBitmap[cardinality];
+    _bitmapWriters = new RoaringBitmapWriter[cardinality];
     for (int i = 0; i < cardinality; i++) {
-      _bitmaps[i] = new MutableRoaringBitmap();
+      _bitmapWriters[i] = RoaringBitmapWriter.bufferWriter().get();
     }
   }
 
   @Override
   public void add(int dictId) {
-    _bitmaps[dictId].add(_nextDocId++);
+    _bitmapWriters[dictId].add(_nextDocId++);
   }
 
   @Override
   public void add(int[] dictIds, int length) {
     for (int i = 0; i < length; i++) {
-      _bitmaps[dictIds[i]].add(_nextDocId);
+      _bitmapWriters[dictIds[i]].add(_nextDocId);
     }
     _nextDocId++;
   }
 
   @Override
   public void seal()
       throws IOException {
-    try (DataOutputStream out = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)))) {
+    // calculate file size
+    int size = (_bitmapWriters.length + 1) * Integer.BYTES;
+    for (RoaringBitmapWriter<MutableRoaringBitmap> writer : _bitmapWriters) {
+      size += writer.get().serializedSizeInBytes();
+      // Check for int overflow
+      Preconditions.checkState(size > 0, "Inverted index file: %s exceeds 2GB 
limit", _invertedIndexFile);
+    }
+    ByteBuffer buffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
size).order(LITTLE_ENDIAN);
       // Write bitmap offsets
-      int bitmapOffset = (_bitmaps.length + 1) * Integer.BYTES;
-      out.writeInt(bitmapOffset);
-      for (MutableRoaringBitmap bitmap : _bitmaps) {
-        bitmapOffset += bitmap.serializedSizeInBytes();
-        // Check for int overflow
-        Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s 
exceeds 2GB limit", _invertedIndexFile);
-        out.writeInt(bitmapOffset);
+      int bitmapOffset = (_bitmapWriters.length + 1) * Integer.BYTES;
+      buffer.putInt(Integer.reverseBytes(bitmapOffset));
+      for (RoaringBitmapWriter<MutableRoaringBitmap> writer : _bitmapWriters) {
+        bitmapOffset += writer.getUnderlying().serializedSizeInBytes();

Review comment:
       Do you need to `flush()` before `getUnderlying()`? Or just use `get()` 
to retrieve the bitmap?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
##########
@@ -19,68 +19,85 @@
 package org.apache.pinot.core.segment.creator.impl.inv;
 
 import com.google.common.base.Preconditions;
-import java.io.BufferedOutputStream;
-import java.io.DataOutputStream;
+
+
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.util.CleanerUtil;
+import org.roaringbitmap.RoaringBitmapWriter;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
+import static java.nio.ByteOrder.LITTLE_ENDIAN;
+
 
 /**
  * Implementation of {@link DictionaryBasedInvertedIndexCreator} that uses 
on-heap memory.
  */
 public final class OnHeapBitmapInvertedIndexCreator implements 
DictionaryBasedInvertedIndexCreator {
   private final File _invertedIndexFile;
-  private final MutableRoaringBitmap[] _bitmaps;
+  private final RoaringBitmapWriter<MutableRoaringBitmap>[] _bitmapWriters;
   private int _nextDocId;
 
+  @SuppressWarnings("unchecked")
   public OnHeapBitmapInvertedIndexCreator(File indexDir, String columnName, 
int cardinality) {
     _invertedIndexFile = new File(indexDir, columnName + 
V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
-    _bitmaps = new MutableRoaringBitmap[cardinality];
+    _bitmapWriters = new RoaringBitmapWriter[cardinality];
     for (int i = 0; i < cardinality; i++) {
-      _bitmaps[i] = new MutableRoaringBitmap();
+      _bitmapWriters[i] = RoaringBitmapWriter.bufferWriter().get();
     }
   }
 
   @Override
   public void add(int dictId) {
-    _bitmaps[dictId].add(_nextDocId++);
+    _bitmapWriters[dictId].add(_nextDocId++);
   }
 
   @Override
   public void add(int[] dictIds, int length) {
     for (int i = 0; i < length; i++) {
-      _bitmaps[dictIds[i]].add(_nextDocId);
+      _bitmapWriters[dictIds[i]].add(_nextDocId);
     }
     _nextDocId++;
   }
 
   @Override
   public void seal()
       throws IOException {
-    try (DataOutputStream out = new DataOutputStream(
-        new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)))) {
+    // calculate file size
+    int size = (_bitmapWriters.length + 1) * Integer.BYTES;
+    for (RoaringBitmapWriter<MutableRoaringBitmap> writer : _bitmapWriters) {
+      size += writer.get().serializedSizeInBytes();
+      // Check for int overflow
+      Preconditions.checkState(size > 0, "Inverted index file: %s exceeds 2GB 
limit", _invertedIndexFile);
+    }
+    ByteBuffer buffer = null;
+    try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, 
"rw").getChannel()) {
+      buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 
size).order(LITTLE_ENDIAN);

Review comment:
       Maybe keeping 2 buffers similar to the off-heap one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to