clintropolis commented on code in PR #17493:
URL: https://github.com/apache/druid/pull/17493#discussion_r1851079260
##########
processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java:
##########
@@ -349,6 +349,12 @@ public static byte[] toUtf8Nullable(@Nullable final String
string)
return toUtf8(string);
}
+ @Nullable
+ public static String nullableValueOf(@Nullable final Object obj)
+ {
+ return obj == null ? null : obj.toString();
Review Comment:
there is also `Evals.asString` which does this same thing, so i don't think
we need to add this new method
##########
processing/src/main/java/org/apache/druid/segment/data/CompressedVSizeColumnarIntsSupplier.java:
##########
@@ -308,7 +308,7 @@ protected int _get(ByteBuffer buffer, boolean bigEngian,
int bufferIndex)
}
}
- private class CompressedVSizeColumnarInts implements ColumnarInts
+ public class CompressedVSizeColumnarInts implements ColumnarInts
Review Comment:
why does only this class need to be public and not all of the subclasses in
the same file?
##########
processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java:
##########
@@ -50,18 +53,24 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
private final FixedIndexed<Double> doubleDictionary;
+ private final Supplier<ColumnarInts> encodedValuesSupplier;
private final ColumnarDoubles valueColumn;
- private final ImmutableBitmap nullValueIndex;
+ private final ImmutableBitmap nullValueBitmap;
Review Comment:
why the name change?
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,241 @@ public void serialize(ColumnValueSelector<? extends
StructuredData> selector) th
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
- closedForWrite = true;
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
+ internalSerializer = new VariantColumnSerializerizer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ public static class VariantColumnSerializerizer implements Serializer
Review Comment:
why do we need another `Serializer` in our `Serializer`? If we really need
it, give it a less confusing name like `InternalSerializer` and please add
javadocs for why this needs to exist
##########
processing/src/main/java/org/apache/druid/segment/nested/NestedCommonFormatColumnSerializer.java:
##########
@@ -76,10 +76,15 @@ public abstract void serializeDictionaries(
protected void writeInternal(FileSmoosher smoosher, Serializer serializer,
String fileName) throws IOException
{
- ColumnSerializerUtils.writeInternal(smoosher, serializer, getColumnName(),
fileName);
+ writeInternal(smoosher, serializer, getColumnName(), fileName);
}
- protected void copyFromTempSmoosh(FileSmoosher smoosher, SmooshedFileMapper
fileMapper) throws IOException
+ protected static void writeInternal(FileSmoosher smoosher, Serializer
serializer, String columnName, String fileName) throws IOException
+ {
+ ColumnSerializerUtils.writeInternal(smoosher, serializer, columnName,
fileName);
+ }
+
Review Comment:
I don't think this method should be static, the reason this was here was for
callers to not have to pass the column name argument. Anything that does need
to pass a column name argument should instead us
`ColumnSerializerUtils.writeInternal` directly.
##########
processing/src/main/java/org/apache/druid/segment/column/StringUtf8DictionaryEncodedColumn.java:
##########
@@ -135,6 +139,11 @@ public int getCardinality()
return utf8Dictionary.size();
}
+ public BitmapFactory getBitmapFactory()
Review Comment:
maybe add javadocs for why this needs to exist? I don't fully understand it
myself, since It seems odd to be on columns since like it is set at the segment
level
##########
processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumn.java:
##########
@@ -50,18 +53,24 @@ public class ScalarDoubleColumn implements
NestedCommonFormatColumn
SemanticUtils.makeAsMap(ScalarDoubleColumn.class);
private final FixedIndexed<Double> doubleDictionary;
+ private final Supplier<ColumnarInts> encodedValuesSupplier;
private final ColumnarDoubles valueColumn;
- private final ImmutableBitmap nullValueIndex;
+ private final ImmutableBitmap nullValueBitmap;
+ private final BitmapFactory bitmapFactory;
public ScalarDoubleColumn(
FixedIndexed<Double> doubleDictionary,
+ Supplier<ColumnarInts> encodedValuesSupplier,
Review Comment:
why is this passed in? It doesn't seem used
##########
processing/src/main/java/org/apache/druid/segment/nested/NestedDataComplexTypeSerde.java:
##########
@@ -150,6 +135,53 @@ public boolean readRetainsBufferReference()
};
}
+ public static StructuredData deserializeBuffer(ByteBuffer buf)
Review Comment:
it seems like worth adding javadocs for all of these methods, since they
expect position and limit to be set and contain only a single `StructuredData`
(which is fine for current callers, but feels like should be called out)
##########
processing/src/main/java/org/apache/druid/segment/IndexSpec.java:
##########
@@ -43,7 +43,7 @@
*/
public class IndexSpec
{
- public static IndexSpec DEFAULT = IndexSpec.builder().build();
+ public static final IndexSpec DEFAULT = IndexSpec.builder().build();
Review Comment:
oops, thanks for adding final :+1:
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,241 @@ public void serialize(ColumnValueSelector<? extends
StructuredData> selector) th
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
- closedForWrite = true;
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
+ internalSerializer = new VariantColumnSerializerizer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ public static class VariantColumnSerializerizer implements Serializer
+ {
+ private final String columnName;
+ private final ByteBuffer columnNameBytes;
+ private final Byte variantTypeSetByte;
+
+ private final DictionaryWriter<String> dictionaryWriter;
+ private final FixedIndexedWriter<Long> longDictionaryWriter;
+ private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+ private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+ private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+ private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+ private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+ private final GenericIndexedWriter<ImmutableBitmap>
arrayElementIndexWriter;
+ private final boolean writeDictionary;
+
+ private final DictionaryIdLookup dictionaryIdLookup;
+
+ public VariantColumnSerializerizer(
+ String columnName,
+ Byte variantTypeSetByte,
+ DictionaryWriter<String> dictionaryWriter,
+ FixedIndexedWriter<Long> longDictionaryWriter,
+ FixedIndexedWriter<Double> doubleDictionaryWriter,
+ FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+ SingleValueColumnarIntsSerializer encodedValueSerializer,
+ GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+ FixedIndexedIntWriter arrayElementDictionaryWriter,
+ GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+ DictionaryIdLookup dictionaryIdLookup,
+ boolean writeDictionary
+ )
+ {
+ this.columnName = columnName;
+ this.columnNameBytes =
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+ this.variantTypeSetByte = variantTypeSetByte;
+ this.dictionaryWriter = dictionaryWriter;
+ this.longDictionaryWriter = longDictionaryWriter;
+ this.doubleDictionaryWriter = doubleDictionaryWriter;
+ this.arrayDictionaryWriter = arrayDictionaryWriter;
+ this.encodedValueSerializer = encodedValueSerializer;
+ this.bitmapIndexWriter = bitmapIndexWriter;
+ this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+ this.arrayElementIndexWriter = arrayElementIndexWriter;
+ this.writeDictionary = writeDictionary;
+ this.dictionaryIdLookup = dictionaryIdLookup;
+
+ boolean[] dictionariesSorted = new boolean[]{
+ dictionaryWriter.isSorted(),
+ longDictionaryWriter.isSorted(),
+ doubleDictionaryWriter.isSorted(),
+ arrayDictionaryWriter.isSorted()
+ };
+ for (boolean sorted : dictionariesSorted) {
+ if (writeDictionary && !sorted) {
+ throw DruidException.defensive(
+ "Dictionary is not sorted? [%s] Should always be sorted",
+ Arrays.toString(dictionariesSorted)
+ );
}
}
Review Comment:
this is strange, why make an array and then loop the array? Doing it like
this there is no indication of which dictionary wasn't sorted so it doesn't
really add anything, so why not just like ```dictionaryWriter.isSorted() &&
longDictionaryWriter.isSorted() && ...```
##########
processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java:
##########
@@ -128,6 +135,12 @@ public static ScalarDoubleColumnAndIndexSupplier read(
);
}
+ final CompressedVSizeColumnarIntsSupplier encodedCol =
CompressedVSizeColumnarIntsSupplier.fromByteBuffer(
+ encodedValuesBuffer,
+ byteOrder,
+ columnBuilder.getFileMapper()
+ );
Review Comment:
seems unfortunate we have to read this since we aren't using it anywhere,
which is why it was left out previously
##########
processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java:
##########
@@ -299,155 +301,241 @@ public void serialize(ColumnValueSelector<? extends
StructuredData> selector) th
}
}
- private void closeForWrite()
+ private void closeForWrite() throws IOException
{
if (!closedForWrite) {
- columnNameBytes = computeFilenameBytes();
- closedForWrite = true;
+ // write out compressed dictionaryId int column, bitmap indexes, and
array element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int scalarCardinality = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ +
dictionaryIdLookup.getDoubleCardinality();
+ final int cardinality = scalarCardinality +
dictionaryIdLookup.getArrayCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse,
+ segmentWriteOutMedium.getCloser()
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final MutableBitmap[] arrayElements = new
MutableBitmap[scalarCardinality];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter =
new GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
+ + dictionaryIdLookup.getLongCardinality()
+ + dictionaryIdLookup.getDoubleCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = dictionaryIdLookup.getArrayValue(dictId);
+ for (int elementId : array) {
+ MutableBitmap bitmap = arrayElements[elementId];
+ if (bitmap == null) {
+ bitmap =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ arrayElements[elementId] = bitmap;
+ }
+ bitmap.add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ if (writeDictionary) {
+ for (int i = 0; i < arrayElements.length; ++i) {
+ if (arrayElements[i] != null) {
+ arrayElementDictionaryWriter.write(i);
+ arrayElementIndexWriter.write(arrayElements[i]);
+ }
+ }
+ }
+
+ internalSerializer = new VariantColumnSerializerizer(
+ name,
+ variantTypeSetByte,
+ dictionaryWriter,
+ longDictionaryWriter,
+ doubleDictionaryWriter,
+ arrayDictionaryWriter,
+ encodedValueSerializer,
+ bitmapIndexWriter,
+ arrayElementDictionaryWriter,
+ arrayElementIndexWriter,
+ dictionaryIdLookup,
+ writeDictionary
+ );
}
}
@Override
- public long getSerializedSize()
+ public long getSerializedSize() throws IOException
{
closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- if (variantTypeSetByte != null) {
- size += 1;
- }
- return size;
+ return internalSerializer.getSerializedSize();
}
@Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
+ public void writeTo(WritableByteChannel channel, FileSmoosher smoosher)
throws IOException
{
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- if (writeDictionary) {
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
- }
-
- // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
- // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
- // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
- // build the bitmap indexes here instead of doing both things
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality()
- + dictionaryIdLookup.getArrayCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
-
- final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse,
- segmentWriteOutMedium.getCloser()
- );
- encodedValueSerializer.open();
+ closeForWrite();
+ internalSerializer.writeTo(channel, smoosher);
+ }
- final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
- final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
-
- final IntIterator rows = intermediateValueWriter.getIterator();
- int rowCount = 0;
- final int arrayBaseId = dictionaryIdLookup.getStringCardinality()
- + dictionaryIdLookup.getLongCardinality()
- + dictionaryIdLookup.getDoubleCardinality();
- while (rows.hasNext()) {
- final int dictId = rows.nextInt();
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- if (dictId >= arrayBaseId) {
- int[] array = dictionaryIdLookup.getArrayValue(dictId);
- for (int elementId : array) {
- arrayElements.computeIfAbsent(
- elementId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
+ public static class VariantColumnSerializerizer implements Serializer
+ {
+ private final String columnName;
+ private final ByteBuffer columnNameBytes;
+ private final Byte variantTypeSetByte;
+
+ private final DictionaryWriter<String> dictionaryWriter;
+ private final FixedIndexedWriter<Long> longDictionaryWriter;
+ private final FixedIndexedWriter<Double> doubleDictionaryWriter;
+ private final FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
+
+ private final SingleValueColumnarIntsSerializer encodedValueSerializer;
+ private final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
+
+ private final FixedIndexedIntWriter arrayElementDictionaryWriter;
+ private final GenericIndexedWriter<ImmutableBitmap>
arrayElementIndexWriter;
+ private final boolean writeDictionary;
+
+ private final DictionaryIdLookup dictionaryIdLookup;
+
+ public VariantColumnSerializerizer(
+ String columnName,
+ Byte variantTypeSetByte,
+ DictionaryWriter<String> dictionaryWriter,
+ FixedIndexedWriter<Long> longDictionaryWriter,
+ FixedIndexedWriter<Double> doubleDictionaryWriter,
+ FrontCodedIntArrayIndexedWriter arrayDictionaryWriter,
+ SingleValueColumnarIntsSerializer encodedValueSerializer,
+ GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter,
+ FixedIndexedIntWriter arrayElementDictionaryWriter,
+ GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter,
+ DictionaryIdLookup dictionaryIdLookup,
+ boolean writeDictionary
+ )
+ {
+ this.columnName = columnName;
+ this.columnNameBytes =
ColumnSerializerUtils.stringToUtf8InVSizeByteBuffer(columnName);
+ this.variantTypeSetByte = variantTypeSetByte;
+ this.dictionaryWriter = dictionaryWriter;
+ this.longDictionaryWriter = longDictionaryWriter;
+ this.doubleDictionaryWriter = doubleDictionaryWriter;
+ this.arrayDictionaryWriter = arrayDictionaryWriter;
+ this.encodedValueSerializer = encodedValueSerializer;
+ this.bitmapIndexWriter = bitmapIndexWriter;
+ this.arrayElementDictionaryWriter = arrayElementDictionaryWriter;
+ this.arrayElementIndexWriter = arrayElementIndexWriter;
+ this.writeDictionary = writeDictionary;
+ this.dictionaryIdLookup = dictionaryIdLookup;
+
+ boolean[] dictionariesSorted = new boolean[]{
+ dictionaryWriter.isSorted(),
+ longDictionaryWriter.isSorted(),
+ doubleDictionaryWriter.isSorted(),
+ arrayDictionaryWriter.isSorted()
+ };
+ for (boolean sorted : dictionariesSorted) {
+ if (writeDictionary && !sorted) {
+ throw DruidException.defensive(
+ "Dictionary is not sorted? [%s] Should always be sorted",
+ Arrays.toString(dictionariesSorted)
+ );
}
}
- rowCount++;
}
- for (int i = 0; i < bitmaps.length; i++) {
- final MutableBitmap bitmap = bitmaps[i];
- bitmapIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- bitmaps[i] = null; // Reclaim memory
- }
- if (writeDictionary) {
- for (Int2ObjectMap.Entry<MutableBitmap> arrayElement :
arrayElements.int2ObjectEntrySet()) {
- arrayElementDictionaryWriter.write(arrayElement.getIntKey());
- arrayElementIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
- );
+ @Override
+ public long getSerializedSize()
+ {
+ long size = 1 + columnNameBytes.capacity();
+ // the value dictionaries, raw column, and null index are all stored in
separate files
Review Comment:
nit: i know this isn't new, but this comment is kind of stale and copied
from the nested column serializer. the variant column doesn't have a raw
column, and instead of 'null index' like the top level nested column has, it
should mention that "value dictionaries and indexes and array element indexes,
and the dictionary id column are stored in separate files" or something similar
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]