adarshsanjeev commented on code in PR #17493:
URL: https://github.com/apache/druid/pull/17493#discussion_r1851871834


##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,246 @@ public void serialize(ColumnValueSelector<? extends 
StructuredData> selector) th
     }
   }
 
-  private void closeForWrite()
+  private void closeForWrite() throws IOException
   {
     if (!closedForWrite) {
-      columnNameBytes = computeFilenameBytes();
+      // write out compressed dictionaryId int column, bitmap indexes, and 
array element bitmap indexes
+      // by iterating intermediate value column the intermediate value column 
should be replaced someday by a cooler
+      // compressed int column writer that allows easy iteration of the values 
it writes out, so that we could just
+      // build the bitmap indexes here instead of doing both things
+      String filenameBase = StringUtils.format("%s.forward_dim", name);
+      final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+                                    + dictionaryIdLookup.getLongCardinality()
+                                    + 
dictionaryIdLookup.getDoubleCardinality();
+      final int cardinality = scalarCardinality + 
dictionaryIdLookup.getArrayCardinality();
+      final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
+      final CompressionStrategy compressionToUse;
+      if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
+        compressionToUse = compression;
+      } else {
+        compressionToUse = CompressionStrategy.LZ4;
+      }
+
+      final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
+          name,
+          segmentWriteOutMedium,
+          filenameBase,
+          cardinality,
+          compressionToUse,
+          segmentWriteOutMedium.getCloser()
+      );
+      encodedValueSerializer.open();
+
+      final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
+          segmentWriteOutMedium,
+          name,
+          indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+      );
+      bitmapIndexWriter.open();
+      bitmapIndexWriter.setObjectsNotSorted();
+      final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+      final MutableBitmap[] arrayElements = new 
MutableBitmap[scalarCardinality];
+      for (int i = 0; i < bitmaps.length; i++) {
+        bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+      }
+      final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = 
new GenericIndexedWriter<>(
+          segmentWriteOutMedium,
+          name + "_arrays",
+          indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+      );
+      arrayElementIndexWriter.open();
+      arrayElementIndexWriter.setObjectsNotSorted();
+
+      final IntIterator rows = intermediateValueWriter.getIterator();
+      int rowCount = 0;
+      final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+                              + dictionaryIdLookup.getLongCardinality()
+                              + dictionaryIdLookup.getDoubleCardinality();
+      while (rows.hasNext()) {
+        final int dictId = rows.nextInt();
+        encodedValueSerializer.addValue(dictId);
+        bitmaps[dictId].add(rowCount);
+        if (dictId >= arrayBaseId) {
+          int[] array = dictionaryIdLookup.getArrayValue(dictId);
+          for (int elementId : array) {
+            MutableBitmap bitmap = arrayElements[elementId];
+            if (bitmap == null) {
+              bitmap = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+              arrayElements[elementId] = bitmap;
+            }
+            bitmap.add(rowCount);
+          }
+        }
+        rowCount++;
+      }
+
+      for (int i = 0; i < bitmaps.length; i++) {
+        final MutableBitmap bitmap = bitmaps[i];
+        bitmapIndexWriter.write(
+            
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+        );
+        bitmaps[i] = null; // Reclaim memory
+      }
+      if (writeDictionary) {
+        for (int i = 0; i < arrayElements.length; ++i) {
+          if (arrayElements[i] != null) {
+            arrayElementDictionaryWriter.write(i);
+            arrayElementIndexWriter.write(arrayElements[i]);
+          }
+        }
+      }
+
       closedForWrite = true;
+      internalSerializer = new InternalSerializer(
+          name,
+          variantTypeSetByte,
+          dictionaryWriter,
+          longDictionaryWriter,
+          doubleDictionaryWriter,
+          arrayDictionaryWriter,
+          encodedValueSerializer,
+          bitmapIndexWriter,
+          arrayElementDictionaryWriter,
+          arrayElementIndexWriter,
+          dictionaryIdLookup,
+          writeDictionary
+      );
     }
   }
 
   @Override
-  public long getSerializedSize()
+  public long getSerializedSize() throws IOException
   {
     closeForWrite();
-
-    long size = 1 + columnNameBytes.capacity();
-    // the value dictionaries, raw column, and null index are all stored in 
separate files
-    if (variantTypeSetByte != null) {
-      size += 1;
-    }
-    return size;
+    return internalSerializer.getSerializedSize();
   }
 
   @Override
-  public void writeTo(
-      WritableByteChannel channel,
-      FileSmoosher smoosher
-  ) throws IOException
+  public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) 
throws IOException
   {
-    Preconditions.checkState(closedForWrite, "Not closed yet!");
-    if (writeDictionary) {
-      Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not 
sorted?!?");
-    }
-
-    // write out compressed dictionaryId int column, bitmap indexes, and array 
element bitmap indexes
-    // by iterating intermediate value column the intermediate value column 
should be replaced someday by a cooler
-    // compressed int column writer that allows easy iteration of the values 
it writes out, so that we could just
-    // build the bitmap indexes here instead of doing both things
-    String filenameBase = StringUtils.format("%s.forward_dim", name);
-    final int cardinality = dictionaryIdLookup.getStringCardinality()
-                            + dictionaryIdLookup.getLongCardinality()
-                            + dictionaryIdLookup.getDoubleCardinality()
-                            + dictionaryIdLookup.getArrayCardinality();
-    final CompressionStrategy compression = 
indexSpec.getDimensionCompression();
-    final CompressionStrategy compressionToUse;
-    if (compression != CompressionStrategy.UNCOMPRESSED && compression != 
CompressionStrategy.NONE) {
-      compressionToUse = compression;
-    } else {
-      compressionToUse = CompressionStrategy.LZ4;
-    }
-
-    final SingleValueColumnarIntsSerializer encodedValueSerializer = 
CompressedVSizeColumnarIntsSerializer.create(
-        name,
-        segmentWriteOutMedium,
-        filenameBase,
-        cardinality,
-        compressionToUse,
-        segmentWriteOutMedium.getCloser()
-    );
-    encodedValueSerializer.open();
+    closeForWrite();
+    internalSerializer.writeTo(channel, smoosher);
+  }
 
-    final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new 
GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name,
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    bitmapIndexWriter.open();
-    bitmapIndexWriter.setObjectsNotSorted();
-    final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
-    final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new 
Int2ObjectRBTreeMap<>();
-    for (int i = 0; i < bitmaps.length; i++) {
-      bitmaps[i] = 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
-    }
-    final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new 
GenericIndexedWriter<>(
-        segmentWriteOutMedium,
-        name + "_arrays",
-        indexSpec.getBitmapSerdeFactory().getObjectStrategy()
-    );
-    arrayElementIndexWriter.open();
-    arrayElementIndexWriter.setObjectsNotSorted();
-
-    final IntIterator rows = intermediateValueWriter.getIterator();
-    int rowCount = 0;
-    final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
-                            + dictionaryIdLookup.getLongCardinality()
-                            + dictionaryIdLookup.getDoubleCardinality();
-    while (rows.hasNext()) {
-      final int dictId = rows.nextInt();
-      encodedValueSerializer.addValue(dictId);
-      bitmaps[dictId].add(rowCount);
-      if (dictId >= arrayBaseId) {
-        int[] array = dictionaryIdLookup.getArrayValue(dictId);
-        for (int elementId : array) {
-          arrayElements.computeIfAbsent(
-              elementId,
-              (id) -> 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
-          ).add(rowCount);
+  /**
+   * Internal serializer used to serailize a {@link VariantColumn}. Contains 
the logic to write out the column to a
+   * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it 
is closed for writes.
+   */
+  public static class InternalSerializer implements Serializer

Review Comment:
   It's essentially to be reused, since it contains only the code to write out 
the column. I've added that detail as well in the PR, is there any other 
details that should be included in the javadoc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to