This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/27.0.0 by this push:
new 6eb2695831 reduce heap footprint of ingesting auto typed columns by
pushing compression and index generation into writeTo (#14615) (#14627)
6eb2695831 is described below
commit 6eb26958319774e71048b8334951368ee18b6366
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Jul 20 04:34:06 2023 -0700
reduce heap footprint of ingesting auto typed columns by pushing
compression and index generation into writeTo (#14615) (#14627)
---
.../apache/druid/query/filter/EqualityFilter.java | 8 +-
.../druid/segment/data/FixedIndexedWriter.java | 13 +-
.../segment/index/AllFalseBitmapColumnIndex.java | 12 +-
.../segment/index/IndexedUtf8ValueIndexes.java | 3 +-
...{ValueIndexes.java => ArrayElementIndexes.java} | 16 +-
.../druid/segment/index/semantic/ValueIndexes.java | 9 +-
.../nested/ScalarDoubleColumnAndIndexSupplier.java | 4 +-
.../nested/ScalarDoubleColumnSerializer.java | 172 +++--------------
.../nested/ScalarLongColumnAndIndexSupplier.java | 5 +-
.../segment/nested/ScalarLongColumnSerializer.java | 172 +++--------------
... ScalarNestedCommonFormatColumnSerializer.java} | 215 ++++++++++-----------
.../nested/ScalarStringColumnSerializer.java | 151 ++-------------
.../nested/VariantColumnAndIndexSupplier.java | 164 ++++++++++++----
.../segment/nested/VariantColumnSerializer.java | 174 +++++++++--------
.../nested/ScalarDoubleColumnSupplierTest.java | 3 +
.../nested/ScalarLongColumnSupplierTest.java | 3 +
.../nested/ScalarStringColumnSupplierTest.java | 3 +
.../segment/nested/VariantColumnSupplierTest.java | 18 ++
18 files changed, 458 insertions(+), 687 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java
b/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java
index 0e57b06141..ba6d17ccc4 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/EqualityFilter.java
@@ -99,11 +99,11 @@ public class EqualityFilter extends
AbstractOptimizableDimFilter implements Filt
throw InvalidInput.exception("Invalid equality filter on column [%s],
matchValueType cannot be null", column);
}
this.matchValueType = matchValueType;
- if (matchValue == null) {
- throw InvalidInput.exception("Invalid equality filter on column [%s],
matchValue cannot be null", column);
- }
this.matchValue = matchValue;
this.matchValueEval =
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(matchValueType),
matchValue);
+ if (matchValueEval.value() == null) {
+ throw InvalidInput.exception("Invalid equality filter on column [%s],
matchValue cannot be null", column);
+ }
this.filterTuning = filterTuning;
this.predicateFactory = new EqualityPredicateFactory(matchValueEval);
}
@@ -239,6 +239,8 @@ public class EqualityFilter extends
AbstractOptimizableDimFilter implements Filt
final ValueIndexes valueIndexes = indexSupplier.as(ValueIndexes.class);
if (valueIndexes != null) {
+ // matchValueEval.value() cannot be null here due to check in the
constructor
+ //noinspection DataFlowIssue
return valueIndexes.forValue(matchValueEval.value(), matchValueType);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
index 76944d2dc4..0146856657 100644
---
a/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
+++
b/processing/src/main/java/org/apache/druid/segment/data/FixedIndexedWriter.java
@@ -24,7 +24,6 @@ import org.apache.druid.io.Channels;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.segment.column.TypeStrategy;
-import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.apache.druid.segment.writeout.WriteOutBytes;
@@ -39,7 +38,7 @@ import java.util.Iterator;
/**
* Writer for a {@link FixedIndexed}
*/
-public class FixedIndexedWriter<T> implements Serializer
+public class FixedIndexedWriter<T> implements DictionaryWriter<T>
{
private static final int PAGE_SIZE = 4096;
private final SegmentWriteOutMedium segmentWriteOutMedium;
@@ -73,11 +72,19 @@ public class FixedIndexedWriter<T> implements Serializer
this.isSorted = isSorted;
}
+ @Override
+ public boolean isSorted()
+ {
+ return isSorted;
+ }
+
+ @Override
public void open() throws IOException
{
this.valuesOut = segmentWriteOutMedium.makeWriteOutBytes();
}
+ @Override
public int getCardinality()
{
return hasNulls ? numWritten + 1 : numWritten;
@@ -89,6 +96,7 @@ public class FixedIndexedWriter<T> implements Serializer
return Byte.BYTES + Byte.BYTES + Integer.BYTES + valuesOut.size();
}
+ @Override
public void write(@Nullable T objectToWrite) throws IOException
{
if (prevObject != null && isSorted && comparator.compare(prevObject,
objectToWrite) >= 0) {
@@ -140,6 +148,7 @@ public class FixedIndexedWriter<T> implements Serializer
}
@SuppressWarnings("unused")
+ @Override
@Nullable
public T get(int index) throws IOException
{
diff --git
a/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
b/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
index fb986c51d9..a7518d44c6 100644
---
a/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
+++
b/processing/src/main/java/org/apache/druid/segment/index/AllFalseBitmapColumnIndex.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment.index;
+import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.filter.ColumnIndexSelector;
import org.apache.druid.segment.column.ColumnIndexCapabilities;
@@ -26,11 +27,16 @@ import
org.apache.druid.segment.column.SimpleColumnIndexCapabilities;
public class AllFalseBitmapColumnIndex implements BitmapColumnIndex
{
- private final ColumnIndexSelector selector;
+ private final BitmapFactory bitmapFactory;
public AllFalseBitmapColumnIndex(ColumnIndexSelector indexSelector)
{
- this.selector = indexSelector;
+ this(indexSelector.getBitmapFactory());
+ }
+
+ public AllFalseBitmapColumnIndex(BitmapFactory bitmapFactory)
+ {
+ this.bitmapFactory = bitmapFactory;
}
@Override
@@ -48,6 +54,6 @@ public class AllFalseBitmapColumnIndex implements
BitmapColumnIndex
@Override
public <T> T computeBitmapResult(BitmapResultFactory<T> bitmapResultFactory)
{
- return
bitmapResultFactory.wrapAllFalse(selector.getBitmapFactory().makeEmptyImmutableBitmap());
+ return
bitmapResultFactory.wrapAllFalse(bitmapFactory.makeEmptyImmutableBitmap());
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
b/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
index 2510597470..143103cb62 100644
---
a/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
+++
b/processing/src/main/java/org/apache/druid/segment/index/IndexedUtf8ValueIndexes.java
@@ -37,6 +37,7 @@ import
org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.Utf8ValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Comparator;
@@ -101,7 +102,7 @@ public final class IndexedUtf8ValueIndexes<TDictionary
extends Indexed<ByteBuffe
@Nullable
@Override
- public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType>
valueType)
+ public BitmapColumnIndex forValue(@Nonnull Object value,
TypeSignature<ValueType> valueType)
{
if (valueType.isPrimitive()) {
return forValue(
diff --git
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ArrayElementIndexes.java
similarity index 68%
copy from
processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
copy to
processing/src/main/java/org/apache/druid/segment/index/semantic/ArrayElementIndexes.java
index 28fcf0ae9b..5a8a9d841b 100644
---
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
+++
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ArrayElementIndexes.java
@@ -26,19 +26,19 @@ import org.apache.druid.segment.index.BitmapColumnIndex;
import javax.annotation.Nullable;
-public interface ValueIndexes
+public interface ArrayElementIndexes
{
-
/**
- * Get the {@link ImmutableBitmap} corresponding to the supplied value.
Generates an empty bitmap when passed a
- * value that doesn't exist. May return null if a value index cannot be
computed for the supplied value type.
+ * Get the {@link ImmutableBitmap} corresponding to rows with array elements
matching the supplied value. Generates
+ * an empty bitmap when passed a value that doesn't exist in any array. May
return null if a value index cannot be
+ * computed for the supplied value type.
*
- * @param value value to match
+ * @param value value to match against any array element in a row
* @param valueType type of the value to match, used to assist conversion
from the match value type to the column
* value type
- * @return {@link ImmutableBitmap} corresponding to the rows
which match the value, or null if an index
- * connot be computed for the supplied value type
+ * @return {@link ImmutableBitmap} corresponding to the rows with
array elements which match the value, or
+ * null if an index connot be computed for the supplied
value type
*/
@Nullable
- BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType);
+ BitmapColumnIndex containsValue(@Nullable Object value,
TypeSignature<ValueType> valueType);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
index 28fcf0ae9b..9eee56896d 100644
---
a/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
+++
b/processing/src/main/java/org/apache/druid/segment/index/semantic/ValueIndexes.java
@@ -24,14 +24,17 @@ import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.index.BitmapColumnIndex;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
public interface ValueIndexes
{
/**
- * Get the {@link ImmutableBitmap} corresponding to the supplied value.
Generates an empty bitmap when passed a
- * value that doesn't exist. May return null if a value index cannot be
computed for the supplied value type.
+ * Get the {@link ImmutableBitmap} corresponding to rows matching the
supplied value. Generates an empty bitmap when
+ * passed a value that doesn't exist. May return null if a value index
cannot be computed for the supplied value type.
+ *
+ * Does not match null, use {@link NullValueIndex} for matching nulls.
*
* @param value value to match
* @param valueType type of the value to match, used to assist conversion
from the match value type to the column
@@ -40,5 +43,5 @@ public interface ValueIndexes
* connot be computed for the supplied value type
*/
@Nullable
- BitmapColumnIndex forValue(Object value, TypeSignature<ValueType> valueType);
+ BitmapColumnIndex forValue(@Nonnull Object value, TypeSignature<ValueType>
valueType);
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
index 5624025a20..02c1f6e0a0 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnAndIndexSupplier.java
@@ -65,6 +65,7 @@ import
org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -228,11 +229,12 @@ public class ScalarDoubleColumnAndIndexSupplier
implements Supplier<NestedCommon
{
@Nullable
@Override
- public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType>
valueType)
+ public BitmapColumnIndex forValue(@Nonnull Object value,
TypeSignature<ValueType> valueType)
{
final ExprEval<?> eval =
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
.castTo(ExpressionType.DOUBLE);
if (eval.isNumericNull()) {
+ // value wasn't null, but not a number?
return null;
}
final double doubleValue = eval.asDouble();
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
index b097f542cf..c3a23ac5e6 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarDoubleColumnSerializer.java
@@ -19,55 +19,28 @@
package org.apache.druid.segment.nested;
-import com.google.common.base.Preconditions;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
-import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ColumnarDoublesSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionFactory;
-import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.FixedIndexedWriter;
-import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.WritableByteChannel;
/**
* Serializer for a {@link ScalarDoubleColumn}
*/
-public class ScalarDoubleColumnSerializer extends
NestedCommonFormatColumnSerializer
+public class ScalarDoubleColumnSerializer extends
ScalarNestedCommonFormatColumnSerializer<Double>
{
- private static final Logger log = new
Logger(ScalarDoubleColumnSerializer.class);
-
- private final String name;
- private final SegmentWriteOutMedium segmentWriteOutMedium;
- private final IndexSpec indexSpec;
- @SuppressWarnings("unused")
- private final Closer closer;
- private DictionaryIdLookup dictionaryIdLookup;
- private FixedIndexedWriter<Double> doubleDictionaryWriter;
- private int rowCount = 0;
- private boolean closedForWrite = false;
- private boolean dictionarySerialized = false;
-
- private SingleValueColumnarIntsSerializer encodedValueSerializer;
private ColumnarDoublesSerializer doublesSerializer;
- private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
- private MutableBitmap[] bitmaps;
- private ByteBuffer columnNameBytes = null;
public ScalarDoubleColumnSerializer(
String name,
@@ -76,54 +49,35 @@ public class ScalarDoubleColumnSerializer extends
NestedCommonFormatColumnSerial
Closer closer
)
{
- this.name = name;
- this.segmentWriteOutMedium = segmentWriteOutMedium;
- this.indexSpec = indexSpec;
- this.closer = closer;
- this.dictionaryIdLookup = new DictionaryIdLookup();
- }
-
- @Override
- public String getColumnName()
- {
- return name;
+ super(name, DOUBLE_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium,
closer);
}
@Override
- public DictionaryIdLookup getGlobalLookup()
+ protected int processValue(@Nullable Object rawValue) throws IOException
{
- return dictionaryIdLookup;
+ final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
+ final double val = eval.asDouble();
+ final int dictId = eval.isNumericNull() ? 0 :
dictionaryIdLookup.lookupDouble(val);
+ doublesSerializer.add(dictId == 0 ? 0.0 : val);
+ return dictId;
}
@Override
- public boolean hasNulls()
+ public void openDictionaryWriter() throws IOException
{
- return !bitmaps[0].isEmpty();
+ dictionaryWriter = new FixedIndexedWriter<>(
+ segmentWriteOutMedium,
+ ColumnType.DOUBLE.getStrategy(),
+ ByteOrder.nativeOrder(),
+ Long.BYTES,
+ true
+ );
+ dictionaryWriter.open();
}
@Override
- public void open() throws IOException
+ protected void openValueColumnSerializer() throws IOException
{
- if (!dictionarySerialized) {
- throw new IllegalStateException("Dictionary not serialized, cannot open
value serializer");
- }
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- doubleDictionaryWriter.getCardinality(),
- compressionToUse
- );
- encodedValueSerializer.open();
-
doublesSerializer = CompressionFactory.getDoubleSerializer(
name,
segmentWriteOutMedium,
@@ -132,31 +86,6 @@ public class ScalarDoubleColumnSerializer extends
NestedCommonFormatColumnSerial
indexSpec.getDimensionCompression()
);
doublesSerializer.open();
-
- bitmapIndexWriter = new GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- bitmaps = new MutableBitmap[doubleDictionaryWriter.getCardinality()];
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- }
-
- @Override
- public void openDictionaryWriter() throws IOException
- {
- doubleDictionaryWriter = new FixedIndexedWriter<>(
- segmentWriteOutMedium,
- ColumnType.DOUBLE.getStrategy(),
- ByteOrder.nativeOrder(),
- Long.BYTES,
- true
- );
- doubleDictionaryWriter.open();
}
@Override
@@ -168,81 +97,26 @@ public class ScalarDoubleColumnSerializer extends
NestedCommonFormatColumnSerial
) throws IOException
{
if (dictionarySerialized) {
- throw new ISE("String dictionary already serialized for column [%s],
cannot serialize again", name);
+ throw new ISE("Double dictionary already serialized for column [%s],
cannot serialize again", name);
}
// null is always 0
- doubleDictionaryWriter.write(null);
+ dictionaryWriter.write(null);
dictionaryIdLookup.addNumericNull();
for (Double value : doubles) {
if (value == null) {
continue;
}
- doubleDictionaryWriter.write(value);
+ dictionaryWriter.write(value);
dictionaryIdLookup.addDouble(value);
}
dictionarySerialized = true;
}
@Override
- public void serialize(ColumnValueSelector<? extends StructuredData>
selector) throws IOException
+ protected void writeValueColumn(FileSmoosher smoosher) throws IOException
{
- if (!dictionarySerialized) {
- throw new ISE("Must serialize value dictionaries before serializing
values for column [%s]", name);
- }
-
- final Object value = StructuredData.unwrap(selector.getObject());
- final ExprEval<?> eval = ExprEval.bestEffortOf(value);
-
- final double val = eval.asDouble();
- final int dictId = eval.isNumericNull() ? 0 :
dictionaryIdLookup.lookupDouble(val);
- encodedValueSerializer.addValue(dictId);
- doublesSerializer.add(dictId == 0 ? 0.0 : val);
- bitmaps[dictId].add(rowCount);
- rowCount++;
- }
-
-
- private void closeForWrite() throws IOException
- {
- if (!closedForWrite) {
- for (int i = 0; i < bitmaps.length; i++) {
- final MutableBitmap bitmap = bitmaps[i];
- bitmapIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- bitmaps[i] = null; // Reclaim memory
- }
- columnNameBytes = computeFilenameBytes();
- closedForWrite = true;
- }
- }
-
- @Override
- public long getSerializedSize() throws IOException
- {
- closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- return size;
- }
-
- @Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
- {
- Preconditions.checkState(closedForWrite, "Not closed yet!");
-
- writeV0Header(channel, columnNameBytes);
- writeInternal(smoosher, doubleDictionaryWriter,
DOUBLE_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, encodedValueSerializer,
ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, doublesSerializer, DOUBLE_VALUE_COLUMN_FILE_NAME);
- writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
-
- log.info("Column [%s] serialized successfully.", name);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
index a5170ca9a5..9682c781e5 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnAndIndexSupplier.java
@@ -64,6 +64,7 @@ import
org.apache.druid.segment.index.semantic.StringValueSetIndexes;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -228,11 +229,13 @@ public class ScalarLongColumnAndIndexSupplier implements
Supplier<NestedCommonFo
{
@Nullable
@Override
- public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType>
valueType)
+ public BitmapColumnIndex forValue(@Nonnull Object value,
TypeSignature<ValueType> valueType)
{
+
final ExprEval<?> eval =
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
.castTo(ExpressionType.LONG);
if (eval.isNumericNull()) {
+ // value wasn't null, but not a number
return null;
}
final long longValue = eval.asLong();
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
index b3ade2f0e2..4d5604851d 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarLongColumnSerializer.java
@@ -19,55 +19,28 @@
package org.apache.druid.segment.nested;
-import com.google.common.base.Preconditions;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
-import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.ColumnarLongsSerializer;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionFactory;
-import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.FixedIndexedWriter;
-import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.WritableByteChannel;
/**
* Serializer for a {@link ScalarLongColumn}
*/
-public class ScalarLongColumnSerializer extends
NestedCommonFormatColumnSerializer
+public class ScalarLongColumnSerializer extends
ScalarNestedCommonFormatColumnSerializer<Long>
{
- private static final Logger log = new
Logger(ScalarLongColumnSerializer.class);
-
- private final String name;
- private final SegmentWriteOutMedium segmentWriteOutMedium;
- private final IndexSpec indexSpec;
- @SuppressWarnings("unused")
- private final Closer closer;
- private DictionaryIdLookup dictionaryIdLookup;
- private FixedIndexedWriter<Long> longDictionaryWriter;
- private int rowCount = 0;
- private boolean closedForWrite = false;
- private boolean dictionarySerialized = false;
-
- private SingleValueColumnarIntsSerializer encodedValueSerializer;
private ColumnarLongsSerializer longsSerializer;
- private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
- private MutableBitmap[] bitmaps;
- private ByteBuffer columnNameBytes = null;
public ScalarLongColumnSerializer(
String name,
@@ -76,54 +49,36 @@ public class ScalarLongColumnSerializer extends
NestedCommonFormatColumnSerializ
Closer closer
)
{
- this.name = name;
- this.segmentWriteOutMedium = segmentWriteOutMedium;
- this.indexSpec = indexSpec;
- this.closer = closer;
- this.dictionaryIdLookup = new DictionaryIdLookup();
+ super(name, LONG_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium,
closer);
}
@Override
- public String getColumnName()
+ protected int processValue(@Nullable Object rawValue) throws IOException
{
- return name;
- }
+ final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
- @Override
- public DictionaryIdLookup getGlobalLookup()
- {
- return dictionaryIdLookup;
+ final long val = eval.asLong();
+ final int dictId = eval.isNumericNull() ? 0 :
dictionaryIdLookup.lookupLong(val);
+ longsSerializer.add(dictId == 0 ? 0L : val);
+ return dictId;
}
@Override
- public boolean hasNulls()
+ public void openDictionaryWriter() throws IOException
{
- return !bitmaps[0].isEmpty();
+ dictionaryWriter = new FixedIndexedWriter<>(
+ segmentWriteOutMedium,
+ ColumnType.LONG.getStrategy(),
+ ByteOrder.nativeOrder(),
+ Long.BYTES,
+ true
+ );
+ dictionaryWriter.open();
}
@Override
- public void open() throws IOException
+ protected void openValueColumnSerializer() throws IOException
{
- if (!dictionarySerialized) {
- throw new IllegalStateException("Dictionary not serialized, cannot open
value serializer");
- }
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- longDictionaryWriter.getCardinality(),
- compressionToUse
- );
- encodedValueSerializer.open();
-
longsSerializer = CompressionFactory.getLongSerializer(
name,
segmentWriteOutMedium,
@@ -133,34 +88,8 @@ public class ScalarLongColumnSerializer extends
NestedCommonFormatColumnSerializ
indexSpec.getDimensionCompression()
);
longsSerializer.open();
-
- bitmapIndexWriter = new GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- bitmaps = new MutableBitmap[longDictionaryWriter.getCardinality()];
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- }
-
- @Override
- public void openDictionaryWriter() throws IOException
- {
- longDictionaryWriter = new FixedIndexedWriter<>(
- segmentWriteOutMedium,
- ColumnType.LONG.getStrategy(),
- ByteOrder.nativeOrder(),
- Long.BYTES,
- true
- );
- longDictionaryWriter.open();
}
-
@Override
public void serializeDictionaries(
Iterable<String> strings,
@@ -170,81 +99,26 @@ public class ScalarLongColumnSerializer extends
NestedCommonFormatColumnSerializ
) throws IOException
{
if (dictionarySerialized) {
- throw new ISE("String dictionary already serialized for column [%s],
cannot serialize again", name);
+ throw new ISE("Long dictionary already serialized for column [%s],
cannot serialize again", name);
}
// null is always 0
- longDictionaryWriter.write(null);
+ dictionaryWriter.write(null);
dictionaryIdLookup.addNumericNull();
for (Long value : longs) {
if (value == null) {
continue;
}
- longDictionaryWriter.write(value);
+ dictionaryWriter.write(value);
dictionaryIdLookup.addLong(value);
}
dictionarySerialized = true;
}
@Override
- public void serialize(ColumnValueSelector<? extends StructuredData>
selector) throws IOException
+ protected void writeValueColumn(FileSmoosher smoosher) throws IOException
{
- if (!dictionarySerialized) {
- throw new ISE("Must serialize value dictionaries before serializing
values for column [%s]", name);
- }
-
- final Object value = StructuredData.unwrap(selector.getObject());
- final ExprEval<?> eval = ExprEval.bestEffortOf(value);
-
- final long val = eval.asLong();
- final int dictId = eval.isNumericNull() ? 0 :
dictionaryIdLookup.lookupLong(val);
- encodedValueSerializer.addValue(dictId);
- longsSerializer.add(dictId == 0 ? 0L : val);
- bitmaps[dictId].add(rowCount);
- rowCount++;
- }
-
-
- private void closeForWrite() throws IOException
- {
- if (!closedForWrite) {
- for (int i = 0; i < bitmaps.length; i++) {
- final MutableBitmap bitmap = bitmaps[i];
- bitmapIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- bitmaps[i] = null; // Reclaim memory
- }
- columnNameBytes = computeFilenameBytes();
- closedForWrite = true;
- }
- }
-
- @Override
- public long getSerializedSize() throws IOException
- {
- closeForWrite();
-
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- return size;
- }
-
- @Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
- {
- Preconditions.checkState(closedForWrite, "Not closed yet!");
-
- writeV0Header(channel, columnNameBytes);
- writeInternal(smoosher, longDictionaryWriter, LONG_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, encodedValueSerializer,
ENCODED_VALUE_COLUMN_FILE_NAME);
writeInternal(smoosher, longsSerializer, LONG_VALUE_COLUMN_FILE_NAME);
- writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
-
- log.info("Column [%s] serialized successfully.", name);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
similarity index 58%
copy from
processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
copy to
processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
index 909d0d50f4..f3ed96942e 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarNestedCommonFormatColumnSerializer.java
@@ -20,69 +20,85 @@
package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
-import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.math.expr.ExprEval;
-import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.column.StringEncodingStrategies;
-import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.DictionaryWriter;
+import org.apache.druid.segment.data.FixedIndexedIntWriter;
import org.apache.druid.segment.data.GenericIndexedWriter;
import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
-/**
- * Serializer for a string {@link NestedCommonFormatColumn} that can be read
with
- * {@link StringUtf8DictionaryEncodedColumn}.
- */
-public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerializer
+public abstract class ScalarNestedCommonFormatColumnSerializer<T> extends
NestedCommonFormatColumnSerializer
{
- private static final Logger log = new
Logger(ScalarStringColumnSerializer.class);
+ protected static final Logger log = new
Logger(ScalarNestedCommonFormatColumnSerializer.class);
- private final String name;
- private final SegmentWriteOutMedium segmentWriteOutMedium;
- private final IndexSpec indexSpec;
+ protected final String name;
+ protected final SegmentWriteOutMedium segmentWriteOutMedium;
+ protected final IndexSpec indexSpec;
@SuppressWarnings("unused")
- private final Closer closer;
- private DictionaryIdLookup dictionaryIdLookup;
- private DictionaryWriter<String> dictionaryWriter;
- private int rowCount = 0;
- private boolean closedForWrite = false;
- private boolean dictionarySerialized = false;
-
- private SingleValueColumnarIntsSerializer encodedValueSerializer;
- private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
- private MutableBitmap[] bitmaps;
- private ByteBuffer columnNameBytes = null;
-
- public ScalarStringColumnSerializer(
+ protected final Closer closer;
+ protected final String dictionaryFileName;
+
+ protected DictionaryIdLookup dictionaryIdLookup;
+ protected DictionaryWriter<T> dictionaryWriter;
+ protected boolean closedForWrite = false;
+ protected boolean dictionarySerialized = false;
+ protected FixedIndexedIntWriter intermediateValueWriter;
+ protected ByteBuffer columnNameBytes = null;
+
+ protected boolean hasNulls;
+
+
+ public ScalarNestedCommonFormatColumnSerializer(
String name,
+ String dictionaryFileName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
Closer closer
)
{
this.name = name;
+ this.dictionaryFileName = dictionaryFileName;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.indexSpec = indexSpec;
this.closer = closer;
this.dictionaryIdLookup = new DictionaryIdLookup();
}
+ /**
+ * Called during {@link #serialize(ColumnValueSelector)} to convert value to
dictionary id.
+ * <p>
+ * Implementations may optionally also serialize the value to a type
specific value column if they opened one with
+ * {@link #openValueColumnSerializer()}, or do whatever else is useful to do
while handling a single row value.
+ */
+ protected abstract int processValue(@Nullable Object rawValue) throws
IOException;
+
+ /**
+ * Called during {@link #open()} to allow opening any separate type specific
value column serializers
+ */
+ protected abstract void openValueColumnSerializer() throws IOException;
+
+ /**
+ * Called during {@link #writeTo(WritableByteChannel, FileSmoosher)} to
allow any type specific value column
+ * serializers to use the {@link FileSmoosher} to write stuff to places.
+ */
+ protected abstract void writeValueColumn(FileSmoosher smoosher) throws
IOException;
+
@Override
public String getColumnName()
{
@@ -98,18 +114,7 @@ public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerial
@Override
public boolean hasNulls()
{
- return !bitmaps[0].isEmpty();
- }
-
- @Override
- public void openDictionaryWriter() throws IOException
- {
- dictionaryWriter = StringEncodingStrategies.getStringDictionaryWriter(
- indexSpec.getStringDictionaryEncoding(),
- segmentWriteOutMedium,
- name
- );
- dictionaryWriter.open();
+ return hasNulls;
}
@Override
@@ -118,62 +123,9 @@ public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerial
if (!dictionarySerialized) {
throw new IllegalStateException("Dictionary not serialized, cannot open
value serializer");
}
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- // always compress
- compressionToUse = CompressionStrategy.LZ4;
- }
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- dictionaryWriter.getCardinality(),
- compressionToUse
- );
- encodedValueSerializer.open();
-
- bitmapIndexWriter = new GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- }
-
- @Override
- public void serializeDictionaries(
- Iterable<String> strings,
- Iterable<Long> longs,
- Iterable<Double> doubles,
- Iterable<int[]> arrays
- ) throws IOException
- {
- if (dictionarySerialized) {
- throw new ISE("String dictionary already serialized for column [%s],
cannot serialize again", name);
- }
-
- // null is always 0
- dictionaryWriter.write(null);
- dictionaryIdLookup.addString(null);
- for (String value : strings) {
- value = NullHandling.emptyToNullIfNeeded(value);
- if (value == null) {
- continue;
- }
-
- dictionaryWriter.write(value);
- dictionaryIdLookup.addString(value);
- }
- dictionarySerialized = true;
+ intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium,
false);
+ intermediateValueWriter.open();
+ openValueColumnSerializer();
}
@Override
@@ -184,31 +136,21 @@ public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerial
}
final Object value = StructuredData.unwrap(selector.getObject());
- final ExprEval<?> eval = ExprEval.bestEffortOf(value);
- final String s = eval.castTo(ExpressionType.STRING).asString();
- final int dictId = dictionaryIdLookup.lookupString(s);
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- rowCount++;
+ final int dictId = processValue(value);
+ intermediateValueWriter.write(dictId);
+ hasNulls = hasNulls || dictId == 0;
}
- private void closeForWrite() throws IOException
+ private void closeForWrite()
{
if (!closedForWrite) {
- for (int i = 0; i < bitmaps.length; i++) {
- final MutableBitmap bitmap = bitmaps[i];
- bitmapIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- bitmaps[i] = null; // Reclaim memory
- }
columnNameBytes = computeFilenameBytes();
closedForWrite = true;
}
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
closeForWrite();
@@ -227,9 +169,60 @@ public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerial
Preconditions.checkState(closedForWrite, "Not closed yet!");
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
+ // write out compressed dictionaryId int column and bitmap indexes by
iterating intermediate value column
+ // the intermediate value column should be replaced someday by a cooler
compressed int column writer that allows
+ // easy iteration of the values it writes out, so that we could just build
the bitmap indexes here instead of
+ // doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ dictionaryWriter.getCardinality(),
+ compressionToUse
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps;
+ bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount++);
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+
writeV0Header(channel, columnNameBytes);
- writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
+ writeInternal(smoosher, dictionaryWriter, dictionaryFileName);
writeInternal(smoosher, encodedValueSerializer,
ENCODED_VALUE_COLUMN_FILE_NAME);
+ writeValueColumn(smoosher);
writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
log.info("Column [%s] serialized successfully.", name);
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
index 909d0d50f4..f28db6e1a0 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/ScalarStringColumnSerializer.java
@@ -19,56 +19,26 @@
package org.apache.druid.segment.nested;
-import com.google.common.base.Preconditions;
-import org.apache.druid.collections.bitmap.ImmutableBitmap;
-import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprEval;
import org.apache.druid.math.expr.ExpressionType;
-import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.StringEncodingStrategies;
import org.apache.druid.segment.column.StringUtf8DictionaryEncodedColumn;
-import org.apache.druid.segment.data.CompressedVSizeColumnarIntsSerializer;
-import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.DictionaryWriter;
-import org.apache.druid.segment.data.GenericIndexedWriter;
-import org.apache.druid.segment.data.SingleValueColumnarIntsSerializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+import javax.annotation.Nullable;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
/**
* Serializer for a string {@link NestedCommonFormatColumn} that can be read
with
* {@link StringUtf8DictionaryEncodedColumn}.
*/
-public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerializer
+public class ScalarStringColumnSerializer extends
ScalarNestedCommonFormatColumnSerializer<String>
{
- private static final Logger log = new
Logger(ScalarStringColumnSerializer.class);
-
- private final String name;
- private final SegmentWriteOutMedium segmentWriteOutMedium;
- private final IndexSpec indexSpec;
- @SuppressWarnings("unused")
- private final Closer closer;
- private DictionaryIdLookup dictionaryIdLookup;
- private DictionaryWriter<String> dictionaryWriter;
- private int rowCount = 0;
- private boolean closedForWrite = false;
- private boolean dictionarySerialized = false;
-
- private SingleValueColumnarIntsSerializer encodedValueSerializer;
- private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
- private MutableBitmap[] bitmaps;
- private ByteBuffer columnNameBytes = null;
-
public ScalarStringColumnSerializer(
String name,
IndexSpec indexSpec,
@@ -76,29 +46,16 @@ public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerial
Closer closer
)
{
- this.name = name;
- this.segmentWriteOutMedium = segmentWriteOutMedium;
- this.indexSpec = indexSpec;
- this.closer = closer;
- this.dictionaryIdLookup = new DictionaryIdLookup();
- }
-
- @Override
- public String getColumnName()
- {
- return name;
- }
-
- @Override
- public DictionaryIdLookup getGlobalLookup()
- {
- return dictionaryIdLookup;
+ super(name, STRING_DICTIONARY_FILE_NAME, indexSpec, segmentWriteOutMedium,
closer);
}
@Override
- public boolean hasNulls()
+ protected int processValue(@Nullable Object rawValue)
{
- return !bitmaps[0].isEmpty();
+ final ExprEval<?> eval = ExprEval.bestEffortOf(rawValue);
+ final String s = eval.castTo(ExpressionType.STRING).asString();
+ final int dictId = dictionaryIdLookup.lookupString(s);
+ return dictId;
}
@Override
@@ -113,40 +70,9 @@ public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerial
}
@Override
- public void open() throws IOException
+ protected void openValueColumnSerializer()
{
- if (!dictionarySerialized) {
- throw new IllegalStateException("Dictionary not serialized, cannot open
value serializer");
- }
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- // always compress
- compressionToUse = CompressionStrategy.LZ4;
- }
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- dictionaryWriter.getCardinality(),
- compressionToUse
- );
- encodedValueSerializer.open();
-
- bitmapIndexWriter = new GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- bitmaps = new MutableBitmap[dictionaryWriter.getCardinality()];
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
+ // no extra value column for strings
}
@Override
@@ -177,61 +103,8 @@ public class ScalarStringColumnSerializer extends
NestedCommonFormatColumnSerial
}
@Override
- public void serialize(ColumnValueSelector<? extends StructuredData>
selector) throws IOException
+ protected void writeValueColumn(FileSmoosher smoosher)
{
- if (!dictionarySerialized) {
- throw new ISE("Must serialize value dictionaries before serializing
values for column [%s]", name);
- }
-
- final Object value = StructuredData.unwrap(selector.getObject());
- final ExprEval<?> eval = ExprEval.bestEffortOf(value);
- final String s = eval.castTo(ExpressionType.STRING).asString();
- final int dictId = dictionaryIdLookup.lookupString(s);
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
- rowCount++;
- }
-
- private void closeForWrite() throws IOException
- {
- if (!closedForWrite) {
- for (int i = 0; i < bitmaps.length; i++) {
- final MutableBitmap bitmap = bitmaps[i];
- bitmapIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- bitmaps[i] = null; // Reclaim memory
- }
- columnNameBytes = computeFilenameBytes();
- closedForWrite = true;
- }
- }
-
- @Override
- public long getSerializedSize() throws IOException
- {
- closeForWrite();
-
- // standard string version
- long size = 1 + columnNameBytes.capacity();
- // the value dictionaries, raw column, and null index are all stored in
separate files
- return size;
- }
-
- @Override
- public void writeTo(
- WritableByteChannel channel,
- FileSmoosher smoosher
- ) throws IOException
- {
- Preconditions.checkState(closedForWrite, "Not closed yet!");
- Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
-
- writeV0Header(channel, columnNameBytes);
- writeInternal(smoosher, dictionaryWriter, STRING_DICTIONARY_FILE_NAME);
- writeInternal(smoosher, encodedValueSerializer,
ENCODED_VALUE_COLUMN_FILE_NAME);
- writeInternal(smoosher, bitmapIndexWriter, BITMAP_INDEX_FILE_NAME);
-
- log.info("Column [%s] serialized successfully.", name);
+ // no extra value column for strings
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
index 5fabfefef6..f67123123d 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnAndIndexSupplier.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.math.expr.ExprEval;
+import org.apache.druid.math.expr.ExprType;
import org.apache.druid.math.expr.ExpressionType;
import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.segment.column.ColumnBuilder;
@@ -47,13 +48,16 @@ import
org.apache.druid.segment.data.FrontCodedIntArrayIndexed;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.Indexed;
import org.apache.druid.segment.data.VByte;
+import org.apache.druid.segment.index.AllFalseBitmapColumnIndex;
import org.apache.druid.segment.index.BitmapColumnIndex;
import org.apache.druid.segment.index.SimpleBitmapColumnIndex;
import org.apache.druid.segment.index.SimpleImmutableBitmapIndex;
+import org.apache.druid.segment.index.semantic.ArrayElementIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.serde.NestedCommonFormatColumnPartSerde;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -93,6 +97,7 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
final Supplier<FixedIndexed<Long>> longDictionarySupplier;
final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
+ final Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier;
final ByteBuffer stringDictionaryBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
@@ -147,6 +152,11 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
columnName,
NestedCommonFormatColumnSerializer.DOUBLE_DICTIONARY_FILE_NAME
);
+ final ByteBuffer arrayElementDictionaryBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
+ mapper,
+ columnName,
+
NestedCommonFormatColumnSerializer.ARRAY_ELEMENT_DICTIONARY_FILE_NAME
+ );
final ByteBuffer valueIndexBuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
@@ -180,6 +190,7 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
byteOrder,
Double.BYTES
);
+
final ByteBuffer arrayDictionarybuffer =
NestedCommonFormatColumnPartSerde.loadInternalFile(
mapper,
columnName,
@@ -193,6 +204,12 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
try (ColumnarInts throwAway = ints.get()) {
size = throwAway.size();
}
+ arrayElementDictionarySupplier = FixedIndexed.read(
+ arrayElementDictionaryBuffer,
+ CompressedNestedDataComplexColumn.INT_TYPE_STRATEGY,
+ byteOrder,
+ Integer.BYTES
+ );
return new VariantColumnAndIndexSupplier(
logicalType,
variantTypeByte,
@@ -201,6 +218,7 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
longDictionarySupplier,
doubleDictionarySupplier,
arrayDictionarySupplier,
+ arrayElementDictionarySupplier,
ints,
valueIndexes,
arrayElementIndexes,
@@ -222,11 +240,11 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
@Nullable
private final Byte variantTypeSetByte;
private final BitmapFactory bitmapFactory;
- private final GenericIndexed<ByteBuffer> stringDictionary;
- private final Supplier<FrontCodedIndexed> frontCodedStringDictionarySupplier;
+ private final Supplier<? extends Indexed<ByteBuffer>>
stringDictionarySupplier;
private final Supplier<FixedIndexed<Long>> longDictionarySupplier;
private final Supplier<FixedIndexed<Double>> doubleDictionarySupplier;
private final Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier;
+ private final Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier;
private final Supplier<ColumnarInts> encodedValueColumnSupplier;
@SuppressWarnings("unused")
private final GenericIndexed<ImmutableBitmap> valueIndexes;
@@ -242,6 +260,7 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
Supplier<FixedIndexed<Long>> longDictionarySupplier,
Supplier<FixedIndexed<Double>> doubleDictionarySupplier,
Supplier<FrontCodedIntArrayIndexed> arrayDictionarySupplier,
+ Supplier<FixedIndexed<Integer>> arrayElementDictionarySupplier,
Supplier<ColumnarInts> encodedValueColumnSupplier,
GenericIndexed<ImmutableBitmap> valueIndexes,
GenericIndexed<ImmutableBitmap> elementIndexes,
@@ -252,11 +271,13 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
{
this.logicalType = logicalType;
this.variantTypeSetByte = variantTypeSetByte;
- this.stringDictionary = stringDictionary;
- this.frontCodedStringDictionarySupplier =
frontCodedStringDictionarySupplier;
+ stringDictionarySupplier = frontCodedStringDictionarySupplier != null
+ ? frontCodedStringDictionarySupplier
+ : stringDictionary::singleThreaded;
this.longDictionarySupplier = longDictionarySupplier;
this.doubleDictionarySupplier = doubleDictionarySupplier;
this.arrayDictionarySupplier = arrayDictionarySupplier;
+ this.arrayElementDictionarySupplier = arrayElementDictionarySupplier;
this.encodedValueColumnSupplier = encodedValueColumnSupplier;
this.valueIndexes = valueIndexes;
this.arrayElementIndexes = elementIndexes;
@@ -273,20 +294,8 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
@Override
public NestedCommonFormatColumn get()
{
- if (frontCodedStringDictionarySupplier != null) {
- return new VariantColumn<>(
- frontCodedStringDictionarySupplier.get(),
- longDictionarySupplier.get(),
- doubleDictionarySupplier.get(),
- arrayDictionarySupplier.get(),
- encodedValueColumnSupplier.get(),
- nullValueBitmap,
- logicalType,
- variantTypeSetByte
- );
- }
return new VariantColumn<>(
- stringDictionary.singleThreaded(),
+ stringDictionarySupplier.get(),
longDictionarySupplier.get(),
doubleDictionarySupplier.get(),
arrayDictionarySupplier.get(),
@@ -306,6 +315,8 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
return (T) (NullValueIndex) () -> nullIndex;
} else if (clazz.equals(ValueIndexes.class) && variantTypeSetByte == null
&& logicalType.isArray()) {
return (T) new ArrayValueIndexes();
+ } else if (clazz.equals(ArrayElementIndexes.class) && variantTypeSetByte
== null && logicalType.isArray()) {
+ return (T) new VariantArrayElementIndexes();
}
return null;
}
@@ -320,30 +331,42 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
}
+ private ImmutableBitmap getElementBitmap(int idx)
+ {
+ if (idx < 0) {
+ return bitmapFactory.makeEmptyImmutableBitmap();
+ }
+ final int elementDictionaryIndex =
arrayElementDictionarySupplier.get().indexOf(idx);
+ if (elementDictionaryIndex < 0) {
+ return bitmapFactory.makeEmptyImmutableBitmap();
+ }
+ final ImmutableBitmap bitmap =
arrayElementIndexes.get(elementDictionaryIndex);
+ return bitmap == null ? bitmapFactory.makeEmptyImmutableBitmap() : bitmap;
+ }
+
private class ArrayValueIndexes implements ValueIndexes
{
@Nullable
@Override
- public BitmapColumnIndex forValue(Object value, TypeSignature<ValueType>
valueType)
+ public BitmapColumnIndex forValue(@Nonnull Object value,
TypeSignature<ValueType> valueType)
{
final ExprEval<?> eval =
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(valueType), value)
.castTo(ExpressionType.fromColumnTypeStrict(logicalType));
- if (eval.value() == null) {
- return null;
- }
final Object[] arrayToMatch = eval.asArray();
Indexed elements;
+ final int elementOffset;
switch (logicalType.getElementType().getType()) {
case STRING:
- elements = frontCodedStringDictionarySupplier != null
- ? frontCodedStringDictionarySupplier.get()
- : stringDictionary.singleThreaded();
+ elements = stringDictionarySupplier.get();
+ elementOffset = 0;
break;
case LONG:
elements = longDictionarySupplier.get();
+ elementOffset = stringDictionarySupplier.get().size();
break;
case DOUBLE:
elements = doubleDictionarySupplier.get();
+ elementOffset = stringDictionarySupplier.get().size() +
longDictionarySupplier.get().size();
break;
default:
throw DruidException.defensive(
@@ -353,30 +376,29 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
}
final int[] ids = new int[arrayToMatch.length];
- boolean hasMissingElement = false;
+ final int arrayOffset = stringDictionarySupplier.get().size() +
longDictionarySupplier.get().size() + doubleDictionarySupplier.get().size();
for (int i = 0; i < arrayToMatch.length; i++) {
- if (logicalType.getElementType().is(ValueType.STRING)) {
+ if (arrayToMatch[i] == null) {
+ ids[i] = 0;
+ } else if (logicalType.getElementType().is(ValueType.STRING)) {
ids[i] = elements.indexOf(StringUtils.toUtf8ByteBuffer((String)
arrayToMatch[i]));
} else {
- ids[i] = elements.indexOf(arrayToMatch[i]);
+ ids[i] = elements.indexOf(arrayToMatch[i]) + elementOffset;
}
if (ids[i] < 0) {
- hasMissingElement = true;
- break;
+ if (value == null) {
+ return new AllFalseBitmapColumnIndex(bitmapFactory);
+ }
}
}
- final boolean noMatch = hasMissingElement;
final FrontCodedIntArrayIndexed dictionary =
arrayDictionarySupplier.get();
return new SimpleBitmapColumnIndex()
{
@Override
public double estimateSelectivity(int totalRows)
{
- if (noMatch) {
- return 0.0;
- }
- final int id = dictionary.indexOf(ids);
+ final int id = dictionary.indexOf(ids) + arrayOffset;
if (id < 0) {
return 0.0;
}
@@ -386,10 +408,7 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
@Override
public <T> T computeBitmapResult(BitmapResultFactory<T>
bitmapResultFactory)
{
- if (noMatch) {
- return
bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
- }
- final int id = dictionary.indexOf(ids);
+ final int id = dictionary.indexOf(ids) + arrayOffset;
if (id < 0) {
return
bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
}
@@ -398,4 +417,73 @@ public class VariantColumnAndIndexSupplier implements
Supplier<NestedCommonForma
};
}
}
+
+ private class VariantArrayElementIndexes implements ArrayElementIndexes
+ {
+
+ @Nullable
+ @Override
+ public BitmapColumnIndex containsValue(@Nullable Object value,
TypeSignature<ValueType> elementValueType)
+ {
+ final ExprEval<?> eval =
ExprEval.ofType(ExpressionType.fromColumnTypeStrict(elementValueType), value)
+
.castTo(ExpressionType.fromColumnTypeStrict(logicalType.getElementType()));
+
+ Indexed elements;
+ final int elementOffset;
+ switch (logicalType.getElementType().getType()) {
+ case STRING:
+ elements = stringDictionarySupplier.get();
+ elementOffset = 0;
+ break;
+ case LONG:
+ elements = longDictionarySupplier.get();
+ elementOffset = stringDictionarySupplier.get().size();
+ break;
+ case DOUBLE:
+ elements = doubleDictionarySupplier.get();
+ elementOffset = stringDictionarySupplier.get().size() +
longDictionarySupplier.get().size();
+ break;
+ default:
+ throw DruidException.defensive(
+ "Unhandled array type [%s] how did this happen?",
+ logicalType.getElementType()
+ );
+ }
+
+ return new SimpleBitmapColumnIndex()
+ {
+ @Override
+ public double estimateSelectivity(int totalRows)
+ {
+ final int elementId = getElementId();
+ if (elementId < 0) {
+ return 0.0;
+ }
+ return (double) getElementBitmap(elementId).size() / totalRows;
+ }
+
+ @Override
+ public <T> T computeBitmapResult(BitmapResultFactory<T>
bitmapResultFactory)
+ {
+ final int elementId = getElementId();
+
+ if (elementId < 0) {
+ return
bitmapResultFactory.wrapDimensionValue(bitmapFactory.makeEmptyImmutableBitmap());
+ }
+ return
bitmapResultFactory.wrapDimensionValue(getElementBitmap(elementId));
+ }
+
+ private int getElementId()
+ {
+ if (eval.value() == null) {
+ return 0;
+ } else if (eval.type().is(ExprType.STRING)) {
+ return
elements.indexOf(StringUtils.toUtf8ByteBuffer(eval.asString()));
+ } else {
+ return elements.indexOf(eval.value()) + elementOffset;
+ }
+ }
+ };
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
index 72d5ed0ab3..e906816b04 100644
---
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
+++
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumnSerializer.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.nested;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
+import it.unimi.dsi.fastutil.ints.IntIterator;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.common.config.NullHandling;
@@ -71,15 +72,12 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
private FixedIndexedWriter<Double> doubleDictionaryWriter;
private FrontCodedIntArrayIndexedWriter arrayDictionaryWriter;
private FixedIndexedIntWriter arrayElementDictionaryWriter;
- private int rowCount = 0;
private boolean closedForWrite = false;
private boolean dictionarySerialized = false;
- private SingleValueColumnarIntsSerializer encodedValueSerializer;
- private GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter;
- private GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter;
- private MutableBitmap[] bitmaps;
+ private FixedIndexedIntWriter intermediateValueWriter;
+
private ByteBuffer columnNameBytes = null;
- private final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
+ private boolean hasNulls;
@Nullable
private final Byte variantTypeSetByte;
@@ -114,7 +112,7 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
@Override
public boolean hasNulls()
{
- return !bitmaps[0].isEmpty();
+ return hasNulls;
}
@Override
@@ -161,45 +159,8 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
if (!dictionarySerialized) {
throw new IllegalStateException("Dictionary not serialized, cannot open
value serializer");
}
- String filenameBase = StringUtils.format("%s.forward_dim", name);
- final int cardinality = dictionaryWriter.getCardinality()
- + longDictionaryWriter.getCardinality()
- + doubleDictionaryWriter.getCardinality()
- + arrayDictionaryWriter.getCardinality();
- final CompressionStrategy compression =
indexSpec.getDimensionCompression();
- final CompressionStrategy compressionToUse;
- if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
- compressionToUse = compression;
- } else {
- compressionToUse = CompressionStrategy.LZ4;
- }
- encodedValueSerializer = CompressedVSizeColumnarIntsSerializer.create(
- name,
- segmentWriteOutMedium,
- filenameBase,
- cardinality,
- compressionToUse
- );
- encodedValueSerializer.open();
-
- bitmapIndexWriter = new GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name,
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- bitmapIndexWriter.open();
- bitmapIndexWriter.setObjectsNotSorted();
- bitmaps = new MutableBitmap[cardinality];
- for (int i = 0; i < bitmaps.length; i++) {
- bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
- }
- arrayElementIndexWriter = new GenericIndexedWriter<>(
- segmentWriteOutMedium,
- name + "_arrays",
- indexSpec.getBitmapSerdeFactory().getObjectStrategy()
- );
- arrayElementIndexWriter.open();
- arrayElementIndexWriter.setObjectsNotSorted();
+ intermediateValueWriter = new FixedIndexedIntWriter(segmentWriteOutMedium,
false);
+ intermediateValueWriter.open();
}
@Override
@@ -211,7 +172,7 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
) throws IOException
{
if (dictionarySerialized) {
- throw new ISE("String dictionary already serialized for column [%s],
cannot serialize again", name);
+ throw new ISE("Value dictionaries already serialized for column [%s],
cannot serialize again", name);
}
// null is always 0
@@ -277,14 +238,10 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
globalIds[i] = -1;
}
Preconditions.checkArgument(globalIds[i] >= 0, "unknown global id [%s]
for value [%s]", globalIds[i], array[i]);
- arrayElements.computeIfAbsent(
- globalIds[i],
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
}
final int dictId = dictionaryIdLookup.lookupArray(globalIds);
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
+ intermediateValueWriter.write(dictId);
+ hasNulls = hasNulls || dictId == 0;
} else {
final Object o = eval.value();
final int dictId;
@@ -300,43 +257,21 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
dictId = -1;
}
Preconditions.checkArgument(dictId >= 0, "unknown global id [%s] for
value [%s]", dictId, o);
- if (dictId != 0) {
- // treat as single element array
- arrayElements.computeIfAbsent(
- dictId,
- (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
- ).add(rowCount);
- }
- encodedValueSerializer.addValue(dictId);
- bitmaps[dictId].add(rowCount);
+ intermediateValueWriter.write(dictId);
+ hasNulls = hasNulls || dictId == 0;
}
-
- rowCount++;
}
- private void closeForWrite() throws IOException
+ private void closeForWrite()
{
if (!closedForWrite) {
- for (int i = 0; i < bitmaps.length; i++) {
- final MutableBitmap bitmap = bitmaps[i];
- bitmapIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
- );
- bitmaps[i] = null; // Reclaim memory
- }
- for (Int2ObjectMap.Entry<MutableBitmap> arrayElement :
arrayElements.int2ObjectEntrySet()) {
- arrayElementDictionaryWriter.write(arrayElement.getIntKey());
- arrayElementIndexWriter.write(
-
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
- );
- }
columnNameBytes = computeFilenameBytes();
closedForWrite = true;
}
}
@Override
- public long getSerializedSize() throws IOException
+ public long getSerializedSize()
{
closeForWrite();
@@ -357,6 +292,87 @@ public class VariantColumnSerializer extends
NestedCommonFormatColumnSerializer
Preconditions.checkState(closedForWrite, "Not closed yet!");
Preconditions.checkArgument(dictionaryWriter.isSorted(), "Dictionary not
sorted?!?");
+ // write out compressed dictionaryId int column, bitmap indexes, and array
element bitmap indexes
+ // by iterating intermediate value column the intermediate value column
should be replaced someday by a cooler
+ // compressed int column writer that allows easy iteration of the values
it writes out, so that we could just
+ // build the bitmap indexes here instead of doing both things
+ String filenameBase = StringUtils.format("%s.forward_dim", name);
+ final int cardinality = dictionaryWriter.getCardinality()
+ + longDictionaryWriter.getCardinality()
+ + doubleDictionaryWriter.getCardinality()
+ + arrayDictionaryWriter.getCardinality();
+ final CompressionStrategy compression =
indexSpec.getDimensionCompression();
+ final CompressionStrategy compressionToUse;
+ if (compression != CompressionStrategy.UNCOMPRESSED && compression !=
CompressionStrategy.NONE) {
+ compressionToUse = compression;
+ } else {
+ compressionToUse = CompressionStrategy.LZ4;
+ }
+
+ final SingleValueColumnarIntsSerializer encodedValueSerializer =
CompressedVSizeColumnarIntsSerializer.create(
+ name,
+ segmentWriteOutMedium,
+ filenameBase,
+ cardinality,
+ compressionToUse
+ );
+ encodedValueSerializer.open();
+
+ final GenericIndexedWriter<ImmutableBitmap> bitmapIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name,
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ bitmapIndexWriter.open();
+ bitmapIndexWriter.setObjectsNotSorted();
+ final MutableBitmap[] bitmaps = new MutableBitmap[cardinality];
+ final Int2ObjectRBTreeMap<MutableBitmap> arrayElements = new
Int2ObjectRBTreeMap<>();
+ for (int i = 0; i < bitmaps.length; i++) {
+ bitmaps[i] =
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();
+ }
+ final GenericIndexedWriter<ImmutableBitmap> arrayElementIndexWriter = new
GenericIndexedWriter<>(
+ segmentWriteOutMedium,
+ name + "_arrays",
+ indexSpec.getBitmapSerdeFactory().getObjectStrategy()
+ );
+ arrayElementIndexWriter.open();
+ arrayElementIndexWriter.setObjectsNotSorted();
+
+ final IntIterator rows = intermediateValueWriter.getIterator();
+ int rowCount = 0;
+ final int arrayBaseId = dictionaryWriter.getCardinality()
+ + longDictionaryWriter.getCardinality()
+ + doubleDictionaryWriter.getCardinality();
+ while (rows.hasNext()) {
+ final int dictId = rows.nextInt();
+ encodedValueSerializer.addValue(dictId);
+ bitmaps[dictId].add(rowCount);
+ if (dictId >= arrayBaseId) {
+ int[] array = arrayDictionaryWriter.get(dictId - arrayBaseId);
+ for (int elementId : array) {
+ arrayElements.computeIfAbsent(
+ elementId,
+ (id) ->
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
+ ).add(rowCount);
+ }
+ }
+ rowCount++;
+ }
+
+ for (int i = 0; i < bitmaps.length; i++) {
+ final MutableBitmap bitmap = bitmaps[i];
+ bitmapIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(bitmap)
+ );
+ bitmaps[i] = null; // Reclaim memory
+ }
+ for (Int2ObjectMap.Entry<MutableBitmap> arrayElement :
arrayElements.int2ObjectEntrySet()) {
+ arrayElementDictionaryWriter.write(arrayElement.getIntKey());
+ arrayElementIndexWriter.write(
+
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeImmutableBitmap(arrayElement.getValue())
+ );
+ }
+
writeV0Header(channel, columnNameBytes);
if (variantTypeSetByte != null) {
channel.write(ByteBuffer.wrap(new byte[]{variantTypeSetByte}));
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
index 383a2046db..ac7bd1c684 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarDoubleColumnSupplierTest.java
@@ -47,6 +47,7 @@ import
org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -248,6 +249,7 @@ public class ScalarDoubleColumnSupplierTest extends
InitializedNullHandlingTest
ColumnValueSelector<?> valueSelector =
column.makeColumnValueSelector(offset);
VectorValueSelector vectorValueSelector =
column.makeVectorValueSelector(vectorOffset);
+ ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
StringValueSetIndexes valueSetIndex =
supplier.as(StringValueSetIndexes.class);
DruidPredicateIndexes predicateIndex =
supplier.as(DruidPredicateIndexes.class);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@@ -279,6 +281,7 @@ public class ScalarDoubleColumnSupplierTest extends
InitializedNullHandlingTest
}
Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i));
+ Assert.assertTrue(valueIndexes.forValue(row,
ColumnType.DOUBLE).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueSetIndex.forSortedValues(new
TreeSet<>(ImmutableSet.of(String.valueOf(row))))
.computeBitmapResult(resultFactory)
.get(i));
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
index 87c4fcd830..ebd27fa794 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarLongColumnSupplierTest.java
@@ -47,6 +47,7 @@ import
org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.VectorValueSelector;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -248,6 +249,7 @@ public class ScalarLongColumnSupplierTest extends
InitializedNullHandlingTest
ColumnValueSelector<?> valueSelector =
column.makeColumnValueSelector(offset);
VectorValueSelector vectorValueSelector =
column.makeVectorValueSelector(vectorOffset);
+ ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
StringValueSetIndexes valueSetIndex =
supplier.as(StringValueSetIndexes.class);
DruidPredicateIndexes predicateIndex =
supplier.as(DruidPredicateIndexes.class);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@@ -279,6 +281,7 @@ public class ScalarLongColumnSupplierTest extends
InitializedNullHandlingTest
}
Assert.assertTrue(valueSetIndex.forValue(String.valueOf(row)).computeBitmapResult(resultFactory).get(i));
+ Assert.assertTrue(valueIndexes.forValue(row,
ColumnType.LONG).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueSetIndex.forSortedValues(new
TreeSet<>(ImmutableSet.of(String.valueOf(row))))
.computeBitmapResult(resultFactory)
.get(i));
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
index da5edc734e..8b48534ac5 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/ScalarStringColumnSupplierTest.java
@@ -49,6 +49,7 @@ import
org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -248,6 +249,7 @@ public class ScalarStringColumnSupplierTest extends
InitializedNullHandlingTest
ColumnValueSelector<?> valueSelector =
column.makeColumnValueSelector(offset);
DimensionSelector dimSelector = column.makeDimensionSelector(offset, null);
+ ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
StringValueSetIndexes valueSetIndex =
supplier.as(StringValueSetIndexes.class);
DruidPredicateIndexes predicateIndex =
supplier.as(DruidPredicateIndexes.class);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
@@ -270,6 +272,7 @@ public class ScalarStringColumnSupplierTest extends
InitializedNullHandlingTest
Assert.assertEquals(dimSelector.idLookup().lookupId(dimSelectorLookupVal),
dimSelector.getRow().get(0));
Assert.assertTrue(valueSetIndex.forValue(row).computeBitmapResult(resultFactory).get(i));
+ Assert.assertTrue(valueIndexes.forValue(row,
ColumnType.STRING).computeBitmapResult(resultFactory).get(i));
Assert.assertTrue(valueSetIndex.forSortedValues(new
TreeSet<>(ImmutableSet.of(row)))
.computeBitmapResult(resultFactory)
.get(i));
diff --git
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
index 69fe1dc9bf..401e5d87eb 100644
---
a/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/nested/VariantColumnSupplierTest.java
@@ -47,9 +47,11 @@ import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.FrontCodedIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
+import org.apache.druid.segment.index.semantic.ArrayElementIndexes;
import org.apache.druid.segment.index.semantic.DruidPredicateIndexes;
import org.apache.druid.segment.index.semantic.NullValueIndex;
import org.apache.druid.segment.index.semantic.StringValueSetIndexes;
+import org.apache.druid.segment.index.semantic.ValueIndexes;
import org.apache.druid.segment.vector.NoFilterVectorOffset;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorObjectSelector;
@@ -377,6 +379,15 @@ public class VariantColumnSupplierTest extends
InitializedNullHandlingTest
Assert.assertNull(predicateIndex);
NullValueIndex nullValueIndex = supplier.as(NullValueIndex.class);
Assert.assertNotNull(nullValueIndex);
+ ValueIndexes valueIndexes = supplier.as(ValueIndexes.class);
+ ArrayElementIndexes arrayElementIndexes =
supplier.as(ArrayElementIndexes.class);
+ if (expectedType.getSingleType() != null &&
expectedType.getSingleType().isArray()) {
+ Assert.assertNotNull(valueIndexes);
+ Assert.assertNotNull(arrayElementIndexes);
+ } else {
+ Assert.assertNull(valueIndexes);
+ Assert.assertNull(arrayElementIndexes);
+ }
SortedMap<String, FieldTypeInfo.MutableTypeSet> fields =
column.getFieldTypeInfo();
Assert.assertEquals(1, fields.size());
@@ -397,6 +408,10 @@ public class VariantColumnSupplierTest extends
InitializedNullHandlingTest
Assert.assertArrayEquals(((List) row).toArray(), (Object[])
valueSelector.getObject());
if (expectedType.getSingleType() != null) {
Assert.assertArrayEquals(((List) row).toArray(), (Object[])
vectorObjectSelector.getObjectVector()[0]);
+ Assert.assertTrue(valueIndexes.forValue(row,
expectedType.getSingleType()).computeBitmapResult(resultFactory).get(i));
+ for (Object o : ((List) row)) {
+ Assert.assertTrue("Failed on row: " + row,
arrayElementIndexes.containsValue(o,
expectedType.getSingleType().getElementType()).computeBitmapResult(resultFactory).get(i));
+ }
} else {
// mixed type vector object selector coerces to the most common
type
Assert.assertArrayEquals(ExprEval.ofType(expressionType,
row).asArray(), (Object[]) vectorObjectSelector.getObjectVector()[0]);
@@ -440,6 +455,9 @@ public class VariantColumnSupplierTest extends
InitializedNullHandlingTest
}
}
Assert.assertTrue(nullValueIndex.get().computeBitmapResult(resultFactory).get(i));
+ if (expectedType.getSingleType() != null) {
+ Assert.assertFalse(arrayElementIndexes.containsValue(null,
expectedType.getSingleType()).computeBitmapResult(resultFactory).get(i));
+ }
}
offset.increment();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]