clintropolis commented on code in PR #17493:
URL: https://github.com/apache/druid/pull/17493#discussion_r1851764485
##########
processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java:
##########
@@ -50,18 +53,24 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
private final FixedIndexed<Double> doubleDictionary;
+ private final Supplier<ColumnarInts> encodedValuesSupplier;
private final ColumnarDoubles valueColumn;
- private final ImmutableBitmap nullValueIndex;
+ private final ImmutableBitmap nullValueBitmap;
Review Comment:
i named them `nullValueIndex` because this bitmap comes from the bitmap
value indexes of the column, the selectors only need the null value index
though so that they can satisfy the `isNull` method of the numeric selectors.
Its not a big deal to rename them i guess, just calling it what it did sort of
reflected where the bitmap came from
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,246 @@ public void serialize(ColumnValueSelector<? extends
StructuredData> selector) th
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
closedForWrite = true;
+ internalSerializer = new InternalSerializer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ /**
+ * Internal serializer used to serailize a {@link VariantColumn}. Contains
the logic to write out the column to a
+ * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it
is closed for writes.
+ */
+ public static class InternalSerializer implements Serializer
Review Comment:
it still isn't completely clear why this needs to exist from the javadocs. I
_think_ the reason is related to the intermediate column writer stuff (which I
don't actually think needs to exist either), but it isn't clear from the javadoc
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,246 @@ public void serialize(ColumnValueSelector<? extends
StructuredData> selector) th
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
closedForWrite = true;
+ internalSerializer = new InternalSerializer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ /**
+ * Internal serializer used to serailize a {@link VariantColumn}. Contains
the logic to write out the column to a
+ * {@link FileSmoosher}. Created by {@link VariantColumnSerializer} once it
is closed for writes.
+ */
+ public static class InternalSerializer implements Serializer
+ {
+ private final String columnName;
+ private final ByteBuffer columnNameBytes;
+ private final Byte variantTypeSetByte;
+
+ private final DictionaryWriter<String> dictionaryWriter;
+ private final FixedIndexedWriter<Long> longDictionaryWriter;
+ private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+ private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+ private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+ private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+ private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+ private final GenericIndexedWriter<ImmutableBitmap>
arrayElementIndexWriter;
+ private final boolean writeDictionary;
+
+ private final DictionaryIdLookup dictionaryIdLookup;
+
+ public InternalSerializer(
+ String columnName,
+ Byte variantTypeSetByte,
+ DictionaryWriter<String> dictionaryWriter,
+ FixedIndexedWriter<Long> longDictionaryWriter,
+ FixedIndexedWriter<Double> doubleDictionaryWriter,
+ FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+ SingleValueColumnarIntsSerializer encodedValueSerializer,
+ GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+ FixedIndexedIntWriter arrayElementDictionaryWriter,
+ GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+ DictionaryIdLookup dictionaryIdLookup,
+ boolean writeDictionary
+ )
+ {
+ this.columnName = columnName;
+ this.columnNameBytes =
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+ this.variantTypeSetByte = variantTypeSetByte;
+ this.dictionaryWriter = dictionaryWriter;
+ this.longDictionaryWriter = longDictionaryWriter;
+ this.doubleDictionaryWriter = doubleDictionaryWriter;
+ this.arrayDictionaryWriter = arrayDictionaryWriter;
+ this.encodedValueSerializer = encodedValueSerializer;
+ this.bitmapIndexWriter = bitmapIndexWriter;
+ this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+ this.arrayElementIndexWriter = arrayElementIndexWriter;
+ this.writeDictionary = writeDictionary;
+ this.dictionaryIdLookup = dictionaryIdLookup;
+
+ boolean[] dictionariesSorted = new boolean[]{
+ dictionaryWriter.isSorted(),
+ longDictionaryWriter.isSorted(),
+ doubleDictionaryWriter.isSorted(),
+ arrayDictionaryWriter.isSorted()
+ };
Review Comment:
these should not be checked unless `writeDictionary` is set
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,241 @@ public void serialize(ColumnValueSelector<? extends
StructuredData> selector) th
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
- closedForWrite = true;
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
+ internalSerializer = new VariantColumnSerializerizer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ public static class VariantColumnSerializerizer implements Serializer
+ {
+ private final String columnName;
+ private final ByteBuffer columnNameBytes;
+ private final Byte variantTypeSetByte;
+
+ private final DictionaryWriter<String> dictionaryWriter;
+ private final FixedIndexedWriter<Long> longDictionaryWriter;
+ private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+ private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+ private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+ private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+ private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+ private final GenericIndexedWriter<ImmutableBitmap>
arrayElementIndexWriter;
+ private final boolean writeDictionary;
+
+ private final DictionaryIdLookup dictionaryIdLookup;
+
+ public VariantColumnSerializerizer(
+ String columnName,
+ Byte variantTypeSetByte,
+ DictionaryWriter<String> dictionaryWriter,
+ FixedIndexedWriter<Long> longDictionaryWriter,
+ FixedIndexedWriter<Double> doubleDictionaryWriter,
+ FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+ SingleValueColumnarIntsSerializer encodedValueSerializer,
+ GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+ FixedIndexedIntWriter arrayElementDictionaryWriter,
+ GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+ DictionaryIdLookup dictionaryIdLookup,
+ boolean writeDictionary
+ )
+ {
+ this.columnName = columnName;
+ this.columnNameBytes =
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+ this.variantTypeSetByte = variantTypeSetByte;
+ this.dictionaryWriter = dictionaryWriter;
+ this.longDictionaryWriter = longDictionaryWriter;
+ this.doubleDictionaryWriter = doubleDictionaryWriter;
+ this.arrayDictionaryWriter = arrayDictionaryWriter;
+ this.encodedValueSerializer = encodedValueSerializer;
+ this.bitmapIndexWriter = bitmapIndexWriter;
+ this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+ this.arrayElementIndexWriter = arrayElementIndexWriter;
+ this.writeDictionary = writeDictionary;
+ this.dictionaryIdLookup = dictionaryIdLookup;
+
+ boolean[] dictionariesSorted = new boolean[]{
+ dictionaryWriter.isSorted(),
+ longDictionaryWriter.isSorted(),
+ doubleDictionaryWriter.isSorted(),
+ arrayDictionaryWriter.isSorted()
+ };
+ for (boolean sorted : dictionariesSorted) {
+ if (writeDictionary && !sorted) {
+ throw DruidException.defensive(
+ "Dictionary is not sorted? [%s] Should always be sorted",
+ Arrays.toString(dictionariesSorted)
+ );
}
}
Review Comment:
i meant that since the error message wasn't specific to any one dictionary
why bother making an array of all of the booleans and iterating that instead of
just like combining all of the booleans together with an and statement and
throwing the exception if false or like
`DruidException.conditionalDefensive(dictionaryWriter.isSorted() &&
longDictionaryWriter.isSorted() && ...)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]