adarshsanjeev commented on code in PR #17493:
URL: https://github.com/apache/druid/pull/17493#discussion_r1851872113
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,246 @@ public void serialize(ColumnValueSelector<? extends
StructuredData> selector) th
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
closedForWrite = true;
+ internalSerializer = new InternalSerializer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ /**
+ * Internal serializer used to serailize a {@link VariantColumn}. Contains
the logic to write out the column to a
+ * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it
is closed for writes.
+ */
+ public static class InternalSerializer implements Serializer
+ {
+ private final String columnName;
+ private final ByteBuffer columnNameBytes;
+ private final Byte variantTypeSetByte;
+
+ private final DictionaryWriter<String> dictionaryWriter;
+ private final FixedIndexedWriter<Long> longDictionaryWriter;
+ private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+ private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+ private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+ private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+ private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+ private final GenericIndexedWriter<ImmutableBitmap>
arrayElementIndexWriter;
+ private final boolean writeDictionary;
+
+ private final DictionaryIdLookup dictionaryIdLookup;
+
+ public InternalSerializer(
+ String columnName,
+ Byte variantTypeSetByte,
+ DictionaryWriter<String> dictionaryWriter,
+ FixedIndexedWriter<Long> longDictionaryWriter,
+ FixedIndexedWriter<Double> doubleDictionaryWriter,
+ FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+ SingleValueColumnarIntsSerializer encodedValueSerializer,
+ GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+ FixedIndexedIntWriter arrayElementDictionaryWriter,
+ GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+ DictionaryIdLookup dictionaryIdLookup,
+ boolean writeDictionary
+ )
+ {
+ this.columnName = columnName;
+ this.columnNameBytes =
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+ this.variantTypeSetByte = variantTypeSetByte;
+ this.dictionaryWriter = dictionaryWriter;
+ this.longDictionaryWriter = longDictionaryWriter;
+ this.doubleDictionaryWriter = doubleDictionaryWriter;
+ this.arrayDictionaryWriter = arrayDictionaryWriter;
+ this.encodedValueSerializer = encodedValueSerializer;
+ this.bitmapIndexWriter = bitmapIndexWriter;
+ this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+ this.arrayElementIndexWriter = arrayElementIndexWriter;
+ this.writeDictionary = writeDictionary;
+ this.dictionaryIdLookup = dictionaryIdLookup;
+
+ boolean[] dictionariesSorted = new boolean[]{
+ dictionaryWriter.isSorted(),
+ longDictionaryWriter.isSorted(),
+ doubleDictionaryWriter.isSorted(),
+ arrayDictionaryWriter.isSorted()
+ };
Review Comment:
Changed in the follow up PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]