adarshsanjeev commented on code in PR #17493:
URL: https://github.com/apache/druid/pull/17493#discussion_r1851872113


##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,246 @@ public void serialize(ColumnValueSelector<? extends 
StructuredData> selector) th
     }
   }
 
-  private void closeForWrite()
+  private void closeForWrite() throws IOException
   {
     if (!closedForWrite) {
-      columnNameBytes = computeFilenameBytes();
+      // write out compressed dictionaryId int column, bitmap indexes, and 
array element bitmap indexes
+      // by iterating intermediate value column the intermediate value column 
should be replaced someday by a cooler
+      // compressed int column writer that allows easy iteration of the values 
it writes out, so that we could just
+      // build the bitmap indexes here instead of doing both things
+      String filenameBase = StringUtils.format("%s.forward_dim", name);
+      final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+                                    + dictionaryIdLookup.getLongCardinality()
+                                    + 
dictionaryIdLookup.getDoubleCardinality();
+      final int cardinality = scalarCardinality + 
dictionaryIdLookup.getArrayCardinality();
+      final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
+      final CompressionStrategy compressionToUse;
+      if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
+        compressionToUse = compression;
+      } else {
+        compressionToUse = CompressionStrategy.LZ4;
+      }
+
+      final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
+          name,
+          segmentWriteOutMedium,
+          filenameBase,
+          cardinality,
+          compressionToUse,
+          segmentWriteOutMedium.getCloser()
+      );
+      encodedValueSerializer.open();
+
+      final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
+          segmentWriteOutMedium,
+          name,
+          indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+      );
+      bitmapIndexWriter.open();
+      bitmapIndexWriter.setObjectsNotSorted();
+      final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+      final MutableBitmap[] arrayElements = new 
MutableBitmap[scalarCardinality];
+      for (int i = 0; i < bitmaps.length; i++) {
+        bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+      }
+      final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = 
new GenericIndexedWriter<>(
+          segmentWriteOutMedium,
+          name + "_arrays",
+          indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+      );
+      arrayElementIndexWriter.open();
+      arrayElementIndexWriter.setObjectsNotSorted();
+
+      final IntIterator rows = intermediateValueWriter.getIterator();
+      int rowCount = 0;
+      final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+                              + dictionaryIdLookup.getLongCardinality()
+                              + dictionaryIdLookup.getDoubleCardinality();
+      while (rows.hasNext()) {
+        final int dictId = rows.nextInt();
+        encodedValueSerializer.addValue(dictId);
+        bitmaps[dictId].add(rowCount);
+        if (dictId >= arrayBaseId) {
+          int[] array = dictionaryIdLookup.getArrayValue(dictId);
+          for (int elementId : array) {
+            MutableBitmap bitmap = arrayElements[elementId];
+            if (bitmap == null) {
+              bitmap = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+              arrayElements[elementId] = bitmap;
+            }
+            bitmap.add(rowCount);
+          }
+        }
+        rowCount++;
+      }
+
+      for (int i = 0; i < bitmaps.length; i++) {
+        final MutableBitmap bitmap = bitmaps[i];
+        bitmapIndexWriter.write(
+            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+        );
+        bitmaps[i] = null; // Reclaim memory
+      }
+      if (writeDictionary) {
+        for (int i = 0; i < arrayElements.length; ++i) {
+          if (arrayElements[i] != null) {
+            arrayElementDictionaryWriter.write(i);
+            arrayElementIndexWriter.write(arrayElements[i]);
+          }
+        }
+      }
+
       closedForWrite = true;
+      internalSerializer = new InternalSerializer(
+          name,
+          variantTypeSetByte,
+          dictionaryWriter,
+          longDictionaryWriter,
+          doubleDictionaryWriter,
+          arrayDictionaryWriter,
+          encodedValueSerializer,
+          bitmapIndexWriter,
+          arrayElementDictionaryWriter,
+          arrayElementIndexWriter,
+          dictionaryIdLookup,
+          writeDictionary
+      );
     }
   }
 
   @Override
-  public long getSerializedSize()
+  public long getSerializedSize() throws IOException
   {
     closeForWrite();
-
-    long size = 1 + columnNameBytes.capacity();
-    // the value dictionaries, raw column, and null index are all stored in 
separate files
-    if (variantTypeSetByte != null) {
-      size += 1;
-    }
-    return size;
+    return internalSerializer.getSerializedSize();
   }
 
   @Override
-  public void writeTo(
-      WritableByteChannel channel,
-      FileSmoosher smoosher
-  ) throws IOException
+  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
   {
-    Preconditions.checkState(closedForWrite, "Not closed yet!");
-    if (writeDictionary) {
-      Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not 
sorted?!?");
-    }
-
-    // write out compressed dictionaryId int column, bitmap indexes, and array 
element bitmap indexes
-    // by iterating intermediate value column the intermediate value column 
should be replaced someday by a cooler
-    // compressed int column writer that allows easy iteration of the values 
it writes out, so that we could just
-    // build the bitmap indexes here instead of doing both things
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final int cardinality = dictionaryIdLookup.getStringCardinality()
-                            + dictionaryIdLookup.getLongCardinality()
-                            + dictionaryIdLookup.getDoubleCardinality()
-                            + dictionaryIdLookup.getArrayCardinality();
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-
-    final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        cardinality,
-        compressionToUse,
-        segmentWriteOutMedium.getCloser()
-    );
-    encodedValueSerializer.open();
+    closeForWrite();
+    internalSerializer.writeTo(channel, smoosher);
+  }
 
-    final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
-    final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new 
Int2ObjectRBTreeMap<>();
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
-    final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new 
GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name + "_arrays",
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    arrayElementIndexWriter.open();
-    arrayElementIndexWriter.setObjectsNotSorted();
-
-    final IntIterator rows = intermediateValueWriter.getIterator();
-    int rowCount = 0;
-    final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
-                            + dictionaryIdLookup.getLongCardinality()
-                            + dictionaryIdLookup.getDoubleCardinality();
-    while (rows.hasNext()) {
-      final int dictId = rows.nextInt();
-      encodedValueSerializer.addValue(dictId);
-      bitmaps[dictId].add(rowCount);
-      if (dictId >= arrayBaseId) {
-        int[] array = dictionaryIdLookup.getArrayValue(dictId);
-        for (int elementId : array) {
-          arrayElements.computeIfAbsent(
-              elementId,
-              (id) -> 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
-          ).add(rowCount);
+  /**
+   * Internal serializer used to serailize a {@link VariantColumn}. Contains 
the logic to write out the column to a
+   * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it 
is closed for writes.
+   */
+  public static class InternalSerializer implements Serializer
+  {
+    private final String columnName;
+    private final ByteBuffer columnNameBytes;
+    private final Byte variantTypeSetByte;
+
+    private final DictionaryWriter<String> dictionaryWriter;
+    private final FixedIndexedWriter<Long> longDictionaryWriter;
+    private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+    private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+    private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+    private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+    private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+    private final GenericIndexedWriter<ImmutableBitmap> 
arrayElementIndexWriter;
+    private final boolean writeDictionary;
+
+    private final DictionaryIdLookup dictionaryIdLookup;
+
+    public InternalSerializer(
+        String columnName,
+        Byte variantTypeSetByte,
+        DictionaryWriter<String> dictionaryWriter,
+        FixedIndexedWriter<Long> longDictionaryWriter,
+        FixedIndexedWriter<Double> doubleDictionaryWriter,
+        FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+        SingleValueColumnarIntsSerializer encodedValueSerializer,
+        GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+        FixedIndexedIntWriter arrayElementDictionaryWriter,
+        GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+        DictionaryIdLookup dictionaryIdLookup,
+        boolean writeDictionary
+    )
+    {
+      this.columnName = columnName;
+      this.columnNameBytes = 
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+      this.variantTypeSetByte = variantTypeSetByte;
+      this.dictionaryWriter = dictionaryWriter;
+      this.longDictionaryWriter = longDictionaryWriter;
+      this.doubleDictionaryWriter = doubleDictionaryWriter;
+      this.arrayDictionaryWriter = arrayDictionaryWriter;
+      this.encodedValueSerializer = encodedValueSerializer;
+      this.bitmapIndexWriter = bitmapIndexWriter;
+      this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+      this.arrayElementIndexWriter = arrayElementIndexWriter;
+      this.writeDictionary = writeDictionary;
+      this.dictionaryIdLookup = dictionaryIdLookup;
+
+      boolean[] dictionariesSorted = new boolean[]{
+          dictionaryWriter.isSorted(),
+          longDictionaryWriter.isSorted(),
+          doubleDictionaryWriter.isSorted(),
+          arrayDictionaryWriter.isSorted()
+      };

Review Comment:
   Changed in the follow up PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to