imply-cheddar commented on code in PR #13803:
URL: https://github.com/apache/druid/pull/13803#discussion_r1148769259
##########
processing/src/main/java/org/apache/druid/math/expr/ExprEval.java:
##########
@@ -370,6 +370,18 @@ public static ExprEval ofComplex(ExpressionType
outputType, @Nullable Object val
return new ComplexExprEval(outputType, value);
}
+ public static ExprEval bestEffortArray(@Nullable List<?> theList)
+ {
+ // do not convert empty lists to arrays with a single null element here,
because that should have been done
+ // by the selectors preparing their ObjectBindings if necessary. If we get
to this point it was legitimately
+ // empty
+ NonnullPair<ExpressionType, Object[]> coerced = coerceListToArray(theList,
false);
+ if (coerced == null) {
+ return bestEffortOf(null);
Review Comment:
Calling `bestEffortOf(null)` looks to me like it's going to go through like
15 if statements, failing them all before just returning `new
StringExprEval(null)`. Why not just return the good thing here given that we
already know what it should be and avoid the potential branches?
##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java:
##########
@@ -426,55 +471,71 @@ public Class<?> classOfObject()
};
}
- static class LiteralFieldIndexer
+ static class FieldIndexer
{
private final GlobalDimensionDictionary globalDimensionDictionary;
- private final NestedLiteralTypeInfo.MutableTypeSet typeSet;
+ private final NestedFieldTypeInfo.MutableTypeSet typeSet;
- LiteralFieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
+ FieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
{
this.globalDimensionDictionary = globalDimensionDictionary;
- this.typeSet = new NestedLiteralTypeInfo.MutableTypeSet();
+ this.typeSet = new NestedFieldTypeInfo.MutableTypeSet();
}
- private StructuredDataProcessor.ProcessedLiteral<?> processValue(@Nullable
Object value)
+ private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?>
eval)
{
- // null value is always added to the global dictionary as id 0, so we
can ignore them here
- if (value != null) {
- // why not
- ExprEval<?> eval = ExprEval.bestEffortOf(value);
- final ColumnType columnType = ExpressionType.toColumnType(eval.type());
-
- switch (columnType.getType()) {
- case LONG:
- globalDimensionDictionary.addLongValue(eval.asLong());
- typeSet.add(ColumnType.LONG);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asLong(),
- StructuredDataProcessor.getLongObjectEstimateSize()
- );
- case DOUBLE:
- globalDimensionDictionary.addDoubleValue(eval.asDouble());
- typeSet.add(ColumnType.DOUBLE);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asDouble(),
- StructuredDataProcessor.getDoubleObjectEstimateSize()
- );
- case STRING:
- default:
- final String asString = eval.asString();
- globalDimensionDictionary.addStringValue(asString);
- typeSet.add(ColumnType.STRING);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asString(),
- StructuredDataProcessor.estimateStringSize(asString)
- );
- }
+ final ColumnType columnType = ExpressionType.toColumnType(eval.type());
+ int sizeEstimate;
+ switch (columnType.getType()) {
+ case LONG:
+ typeSet.add(ColumnType.LONG);
+ sizeEstimate = globalDimensionDictionary.addLongValue(eval.asLong());
+ return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(),
sizeEstimate);
+ case DOUBLE:
+ typeSet.add(ColumnType.DOUBLE);
+ sizeEstimate =
globalDimensionDictionary.addDoubleValue(eval.asDouble());
+ return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(),
sizeEstimate);
+ case ARRAY:
+ // sanity check, this should never happen
+ Preconditions.checkNotNull(
+ columnType.getElementType(),
+ "Array type [%s] for value [%s] missing element type, how did
this possibly happen?",
+ eval.type(),
+ eval.valueOrDefault()
+ );
+ switch (columnType.getElementType().getType()) {
+ case LONG:
+ typeSet.add(ColumnType.LONG_ARRAY);
+ final Object[] longArray = eval.asArray();
+ sizeEstimate = globalDimensionDictionary.addLongArray(longArray);
+ return new StructuredDataProcessor.ProcessedValue<>(longArray,
sizeEstimate);
+ case DOUBLE:
+ typeSet.add(ColumnType.DOUBLE_ARRAY);
+ final Object[] doubleArray = eval.asArray();
+ sizeEstimate =
globalDimensionDictionary.addDoubleArray(doubleArray);
+ return new StructuredDataProcessor.ProcessedValue<>(doubleArray,
sizeEstimate);
+ case STRING:
+ final Object[] stringArray = eval.asArray();
+ // empty arrays and arrays with all nulls are detected as string
arrays, but dont count them as part of
+ // the type set
+ if (stringArray.length > 0 &&
!Arrays.stream(stringArray).allMatch(Objects::isNull)) {
+ typeSet.add(ColumnType.STRING_ARRAY);
+ }
+ sizeEstimate =
globalDimensionDictionary.addStringArray(stringArray);
+ return new StructuredDataProcessor.ProcessedValue<>(stringArray,
sizeEstimate);
+ default:
+ throw new IAE("Unhandled type: %s", columnType);
+ }
+ case STRING:
+ default:
Review Comment:
What's an example of when we hit the `default` case? I'm legitimately
asking because I do not know the answer and find myself wondering if believing
that it's a String is really the right thing to do versus generating a parsing
error or something of that nature.
##########
processing/src/main/java/org/apache/druid/segment/nested/NestedFieldTypeInfo.java:
##########
@@ -31,29 +32,36 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
+import java.util.Objects;
import java.util.Set;
/**
* Binary serialization for nested field type info, translated into this
compact format for storage in segments.
* The index of the type info here is the same as the field index in {@link
CompressedNestedDataComplexColumn#fields}
*/
-public class NestedLiteralTypeInfo
+public class NestedFieldTypeInfo
{
private static final byte STRING_MASK = 1;
private static final byte LONG_MASK = 1 << 2;
private static final byte DOUBLE_MASK = 1 << 3;
- public static NestedLiteralTypeInfo read(ByteBuffer buffer, int length)
+ private static final byte STRING_ARRAY_MASK = 1 << 4;
+
+ private static final byte LONG_ARRAY_MASK = 1 << 5;
+
+ private static final byte DOUBLE_ARRAY_MASK = 1 << 6;
Review Comment:
I dunno if I'm being stingy, but couldn't we have an `array` mask that is
applied and then we check the type mask after that? Would mean that we can add
array support to this and only consume 1 more bit instead of consuming 3 more
bits.
##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIntArrayIndexed.java:
##########
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing int arrays, which must be sorted
and unique, using 'front coding'.
+ *
+ * Front coding is a type of delta encoding, where sorted values are grouped
into buckets. The first value of the bucket
+ * is written entirely, and remaining values are stored as a pair of an
integer which indicates how much of the first
+ * int array of the bucket to use as a prefix, followed by the remaining ints
after the prefix to complete the value.
+ *
+ * front coded indexed layout:
+ * | version | bucket size | has null? | number of values | size of "offsets"
+ "buckets" | "offsets" | "buckets" |
+ * | ------- | ----------- | --------- | ---------------- |
----------------------------- | --------- | --------- |
+ * | byte | byte | byte | vbyte int |
vbyte int | int[] | bucket[] |
+ *
+ * "offsets" are the ending offsets of each bucket stored in order, stored as
plain integers for easy random access.
+ *
+ * bucket layout:
+ * | first value | prefix length | fragment | ... | prefix length | fragment |
+ * | ----------- | ------------- | -------- | --- | ------------- | -------- |
+ * | int[] | vbyte int | int[] | ... | vbyte int | int[] |
+ *
+ * int array layout:
+ * | length | ints |
+ * | ----------- | ----- |
+ * | vbyte int | int[] |
+ *
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ *
+ * This class is not thread-safe since during operation modifies positions of
a shared buffer.
+ */
+public final class FrontCodedIntArrayIndexed implements Indexed<int[]>
+{
+ public static Supplier<FrontCodedIntArrayIndexed> read(ByteBuffer buffer,
ByteOrder ordering)
+ {
+ final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+ final byte version = orderedBuffer.get();
+ Preconditions.checkArgument(version == 0, "only V0 exists, encountered " +
version);
+ final int bucketSize = Byte.toUnsignedInt(orderedBuffer.get());
+ final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+ final int numValues = VByte.readInt(orderedBuffer);
+ // size of offsets + values
+ final int size = VByte.readInt(orderedBuffer);
+ final int offsetsPosition = orderedBuffer.position();
+ // move position to end of buffer
+ buffer.position(offsetsPosition + size);
+
+ return () -> new FrontCodedIntArrayIndexed(
+ buffer,
+ ordering,
+ bucketSize,
+ numValues,
+ hasNull,
+ offsetsPosition
+ );
+ }
+
+ private final ByteBuffer buffer;
+ private final int adjustedNumValues;
+ private final int adjustIndex;
+ private final int bucketSize;
+ private final int numBuckets;
+ private final int div;
+ private final int rem;
+ private final int offsetsPosition;
+ private final int bucketsPosition;
+ private final boolean hasNull;
+ private final int lastBucketNumValues;
+
+ private FrontCodedIntArrayIndexed(
+ ByteBuffer buffer,
+ ByteOrder order,
+ int bucketSize,
+ int numValues,
+ boolean hasNull,
+ int offsetsPosition
+ )
+ {
+ if (Integer.bitCount(bucketSize) != 1) {
+ throw new ISE("bucketSize must be a power of two but was[%,d]",
bucketSize);
+ }
+ this.buffer = buffer.asReadOnlyBuffer().order(order);
+ this.bucketSize = bucketSize;
+ this.hasNull = hasNull;
+
+ this.numBuckets = (int) Math.ceil((double) numValues / (double)
bucketSize);
+ this.adjustIndex = hasNull ? 1 : 0;
+ this.adjustedNumValues = numValues + adjustIndex;
+ this.div = Integer.numberOfTrailingZeros(bucketSize);
+ this.rem = bucketSize - 1;
+ this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues
& rem;
+ this.offsetsPosition = offsetsPosition;
+ this.bucketsPosition = offsetsPosition + ((numBuckets - 1) *
Integer.BYTES);
+ }
+
+ @Override
+ public int size()
+ {
+ return adjustedNumValues;
+ }
+
+ @Nullable
+ @Override
+ public int[] get(int index)
+ {
+ if (hasNull && index == 0) {
+ return null;
+ }
+ Indexed.checkIndex(index, adjustedNumValues);
+
+ // due to vbyte encoding, the null value is not actually stored in the
bucket (no negative values), so we adjust
+ // the index
Review Comment:
If I understood correctly, can I suggest updating the comment to be
```
// For arrays, there is a conundrum of how to encode null different from the
empty array. In this code,
// we take the following approach:
// 1) the 0 entry in our dictionary is assumed to be indicative of a
completely null entry
// 2) The 1 value in our dictionary is assumed to be the empty array
// 3) Instead of storing the 0 value in this dictionary, we employ an
indexing-shifting technique, where the
// dictionary never stores null, and starts with the empty array
(index 1), but shifted by 1 such that what
// is persisted on disk is actually 0 instead of 1.
// adjustIndex represents whether the null existed and thus whether we
actually need to adjust the value
```
##########
processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java:
##########
@@ -81,46 +87,75 @@ public class NestedDataColumnSerializer implements
GenericColumnSerializer<Struc
private final StructuredDataProcessor fieldProcessor = new
StructuredDataProcessor()
{
@Override
- public StructuredDataProcessor.ProcessedLiteral<?>
processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue)
+ public ProcessedValue<?> processField(ArrayList<NestedPathPart> fieldPath,
@Nullable Object fieldValue)
{
final GlobalDictionaryEncodedFieldColumnWriter<?> writer =
fieldWriters.get(
NestedPathFinder.toNormalizedJsonPath(fieldPath)
);
if (writer != null) {
try {
- ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
+ final ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
if (eval.type().isPrimitive() || (eval.type().isArray() &&
eval.type().getElementType().isPrimitive())) {
writer.addValue(rowCount, eval.value());
} else {
// behave consistently with nested column indexer, which defaults
to string
writer.addValue(rowCount, eval.asString());
}
// serializer doesn't use size estimate
- return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL;
+ return ProcessedValue.NULL_LITERAL;
}
catch (IOException e) {
- throw new RuntimeException(":(");
+ throw new RE(e, "Failed to write field [%s] value [%s]", fieldPath,
fieldValue);
}
}
- return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL;
+ return ProcessedValue.NULL_LITERAL;
+ }
+
+ @Nullable
+ @Override
+ public ProcessedValue<?> processArrayField(
+ ArrayList<NestedPathPart> fieldPath,
+ @Nullable List<?> array
+ )
+ {
+ final ExprEval<?> eval = ExprEval.bestEffortArray(array);
+ if (eval.type().isArray() && eval.type().getElementType().isPrimitive())
{
+ final GlobalDictionaryEncodedFieldColumnWriter<?> writer =
fieldWriters.get(
+ NestedPathFinder.toNormalizedJsonPath(fieldPath)
+ );
+ if (writer != null) {
+ try {
+ writer.addValue(rowCount, eval.value());
+ // serializer doesn't use size estimate
+ return ProcessedValue.NULL_LITERAL;
+ }
+ catch (IOException e) {
+ throw new RE(e, "Failed to write field [%s] value [%s]",
fieldPath, array);
Review Comment:
This exception leaks the content of the data if it ever gets built. I think
the "best" we can do here is mention which field it was trying to write.
##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -234,7 +272,9 @@ private GlobalDictionarySortedCollector
getSortedIndexFromV1QueryableAdapterNest
return new GlobalDictionarySortedCollector(
new
StringEncodingStrategies.Utf8ToStringIndexed(column.getStringDictionary()),
column.getLongDictionary(),
- column.getDoubleDictionary()
+ column.getDoubleDictionary(),
+ column.getArraysIterable(),
+ column.getArrayDictionary().size()
Review Comment:
Sure, doesn't have to be an `Indexed`, just also seemed weird to be passing
in fully encapsulated objects above and then suddenly start passing in 2
arguments to wrap a new thing that seems similar to the ones above...
##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java:
##########
@@ -426,55 +471,71 @@ public Class<?> classOfObject()
};
}
- static class LiteralFieldIndexer
+ static class FieldIndexer
{
private final GlobalDimensionDictionary globalDimensionDictionary;
- private final NestedLiteralTypeInfo.MutableTypeSet typeSet;
+ private final NestedFieldTypeInfo.MutableTypeSet typeSet;
- LiteralFieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
+ FieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
{
this.globalDimensionDictionary = globalDimensionDictionary;
- this.typeSet = new NestedLiteralTypeInfo.MutableTypeSet();
+ this.typeSet = new NestedFieldTypeInfo.MutableTypeSet();
}
- private StructuredDataProcessor.ProcessedLiteral<?> processValue(@Nullable
Object value)
+ private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?>
eval)
{
- // null value is always added to the global dictionary as id 0, so we
can ignore them here
- if (value != null) {
- // why not
- ExprEval<?> eval = ExprEval.bestEffortOf(value);
- final ColumnType columnType = ExpressionType.toColumnType(eval.type());
-
- switch (columnType.getType()) {
- case LONG:
- globalDimensionDictionary.addLongValue(eval.asLong());
- typeSet.add(ColumnType.LONG);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asLong(),
- StructuredDataProcessor.getLongObjectEstimateSize()
- );
- case DOUBLE:
- globalDimensionDictionary.addDoubleValue(eval.asDouble());
- typeSet.add(ColumnType.DOUBLE);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asDouble(),
- StructuredDataProcessor.getDoubleObjectEstimateSize()
- );
- case STRING:
- default:
- final String asString = eval.asString();
- globalDimensionDictionary.addStringValue(asString);
- typeSet.add(ColumnType.STRING);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asString(),
- StructuredDataProcessor.estimateStringSize(asString)
- );
- }
+ final ColumnType columnType = ExpressionType.toColumnType(eval.type());
+ int sizeEstimate;
+ switch (columnType.getType()) {
+ case LONG:
+ typeSet.add(ColumnType.LONG);
+ sizeEstimate = globalDimensionDictionary.addLongValue(eval.asLong());
+ return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(),
sizeEstimate);
+ case DOUBLE:
+ typeSet.add(ColumnType.DOUBLE);
+ sizeEstimate =
globalDimensionDictionary.addDoubleValue(eval.asDouble());
+ return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(),
sizeEstimate);
+ case ARRAY:
+ // sanity check, this should never happen
+ Preconditions.checkNotNull(
+ columnType.getElementType(),
+ "Array type [%s] for value [%s] missing element type, how did
this possibly happen?",
+ eval.type(),
+ eval.valueOrDefault()
+ );
Review Comment:
Nit: I think this would be better as an `if` statement. If this ever gets
thrown out, someone is gonna want to attach a debugger to the point that this
gets thrown from and they are gonna need to convert it to an if statement to be
able to do that without setting some conditions and stuff on their debug
breakpoint.
##########
sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java:
##########
@@ -190,6 +190,7 @@ public static void setupNullValues()
public static final Map<String, Object> QUERY_CONTEXT_NO_STRINGIFY_ARRAY =
DEFAULT_QUERY_CONTEXT_BUILDER.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
+ .put(PlannerContext.CTX_ENABLE_UNNEST, true)
Review Comment:
Not sure how much I care about this, but I expected this to be put on the
Unnest/Array tests instead of across all of the tests in this base class.
##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java:
##########
@@ -426,55 +471,71 @@ public Class<?> classOfObject()
};
}
- static class LiteralFieldIndexer
+ static class FieldIndexer
{
private final GlobalDimensionDictionary globalDimensionDictionary;
- private final NestedLiteralTypeInfo.MutableTypeSet typeSet;
+ private final NestedFieldTypeInfo.MutableTypeSet typeSet;
- LiteralFieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
+ FieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
{
this.globalDimensionDictionary = globalDimensionDictionary;
- this.typeSet = new NestedLiteralTypeInfo.MutableTypeSet();
+ this.typeSet = new NestedFieldTypeInfo.MutableTypeSet();
}
- private StructuredDataProcessor.ProcessedLiteral<?> processValue(@Nullable
Object value)
+ private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?>
eval)
{
- // null value is always added to the global dictionary as id 0, so we
can ignore them here
- if (value != null) {
- // why not
- ExprEval<?> eval = ExprEval.bestEffortOf(value);
- final ColumnType columnType = ExpressionType.toColumnType(eval.type());
-
- switch (columnType.getType()) {
- case LONG:
- globalDimensionDictionary.addLongValue(eval.asLong());
- typeSet.add(ColumnType.LONG);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asLong(),
- StructuredDataProcessor.getLongObjectEstimateSize()
- );
- case DOUBLE:
- globalDimensionDictionary.addDoubleValue(eval.asDouble());
- typeSet.add(ColumnType.DOUBLE);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asDouble(),
- StructuredDataProcessor.getDoubleObjectEstimateSize()
- );
- case STRING:
- default:
- final String asString = eval.asString();
- globalDimensionDictionary.addStringValue(asString);
- typeSet.add(ColumnType.STRING);
- return new StructuredDataProcessor.ProcessedLiteral<>(
- eval.asString(),
- StructuredDataProcessor.estimateStringSize(asString)
- );
- }
+ final ColumnType columnType = ExpressionType.toColumnType(eval.type());
+ int sizeEstimate;
+ switch (columnType.getType()) {
+ case LONG:
+ typeSet.add(ColumnType.LONG);
+ sizeEstimate = globalDimensionDictionary.addLongValue(eval.asLong());
+ return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(),
sizeEstimate);
+ case DOUBLE:
+ typeSet.add(ColumnType.DOUBLE);
+ sizeEstimate =
globalDimensionDictionary.addDoubleValue(eval.asDouble());
+ return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(),
sizeEstimate);
+ case ARRAY:
+ // sanity check, this should never happen
+ Preconditions.checkNotNull(
+ columnType.getElementType(),
+ "Array type [%s] for value [%s] missing element type, how did
this possibly happen?",
+ eval.type(),
+ eval.valueOrDefault()
+ );
+ switch (columnType.getElementType().getType()) {
+ case LONG:
+ typeSet.add(ColumnType.LONG_ARRAY);
+ final Object[] longArray = eval.asArray();
+ sizeEstimate = globalDimensionDictionary.addLongArray(longArray);
+ return new StructuredDataProcessor.ProcessedValue<>(longArray,
sizeEstimate);
+ case DOUBLE:
+ typeSet.add(ColumnType.DOUBLE_ARRAY);
+ final Object[] doubleArray = eval.asArray();
+ sizeEstimate =
globalDimensionDictionary.addDoubleArray(doubleArray);
+ return new StructuredDataProcessor.ProcessedValue<>(doubleArray,
sizeEstimate);
+ case STRING:
+ final Object[] stringArray = eval.asArray();
+ // empty arrays and arrays with all nulls are detected as string
arrays, but dont count them as part of
+ // the type set
+ if (stringArray.length > 0 &&
!Arrays.stream(stringArray).allMatch(Objects::isNull)) {
+ typeSet.add(ColumnType.STRING_ARRAY);
+ }
+ sizeEstimate =
globalDimensionDictionary.addStringArray(stringArray);
+ return new StructuredDataProcessor.ProcessedValue<>(stringArray,
sizeEstimate);
+ default:
+ throw new IAE("Unhandled type: %s", columnType);
Review Comment:
Just double checking, is this where we expect an array-of-arrays or an
array-of-objects to end up?
##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -86,99 +94,129 @@ public NestedDataColumnMerger(
@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters)
throws IOException
{
+ try {
+ long dimStartTime = System.currentTimeMillis();
+
+ int numMergeIndex = 0;
+ GlobalDictionarySortedCollector sortedLookup = null;
+ final Indexed[] sortedLookups = new Indexed[adapters.size()];
+ final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+ final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+ final Iterable<Object[]>[] sortedArrayLookups = new
Iterable[adapters.size()];
+
+ final SortedMap<String, NestedFieldTypeInfo.MutableTypeSet> mergedFields
= new TreeMap<>();
+
+ for (int i = 0; i < adapters.size(); i++) {
+ final IndexableAdapter adapter = adapters.get(i);
+ final GlobalDictionarySortedCollector dimValues;
+ if (adapter instanceof IncrementalIndexAdapter) {
+ dimValues =
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter,
mergedFields);
+ } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+ dimValues =
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter,
mergedFields);
+ } else {
+ throw new ISE("Unable to merge columns of unsupported adapter [%s]",
adapter.getClass());
+ }
- long dimStartTime = System.currentTimeMillis();
-
- int numMergeIndex = 0;
- GlobalDictionarySortedCollector sortedLookup = null;
- final Indexed[] sortedLookups = new Indexed[adapters.size()];
- final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
- final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+ boolean allNulls = dimValues == null || dimValues.allNull();
+ sortedLookup = dimValues;
+ if (!allNulls) {
+ sortedLookups[i] = dimValues.getSortedStrings();
+ sortedLongLookups[i] = dimValues.getSortedLongs();
+ sortedDoubleLookups[i] = dimValues.getSortedDoubles();
+ sortedArrayLookups[i] = dimValues.getSortedArrays();
+ numMergeIndex++;
+ }
+ }
- final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields
= new TreeMap<>();
+ descriptorBuilder = new ColumnDescriptor.Builder();
- for (int i = 0; i < adapters.size(); i++) {
- final IndexableAdapter adapter = adapters.get(i);
- final GlobalDictionarySortedCollector dimValues;
- if (adapter instanceof IncrementalIndexAdapter) {
- dimValues =
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter,
mergedFields);
- } else if (adapter instanceof QueryableIndexIndexableAdapter) {
- dimValues =
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter,
mergedFields);
+ final NestedDataColumnSerializer defaultSerializer = new
NestedDataColumnSerializer(
+ name,
+ indexSpec,
+ segmentWriteOutMedium,
+ progressIndicator,
+ closer
+ );
+ serializer = defaultSerializer;
+
+ final ComplexColumnPartSerde partSerde =
ComplexColumnPartSerde.serializerBuilder()
+
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
+
.withDelegate(serializer)
+ .build();
+ descriptorBuilder.setValueType(ValueType.COMPLEX)
+ .setHasMultipleValues(false)
+ .addSerde(partSerde);
+
+ defaultSerializer.open();
+ defaultSerializer.serializeFields(mergedFields);
+
+ int stringCardinality;
+ int longCardinality;
+ int doubleCardinality;
+ int arrayCardinality;
+ if (numMergeIndex == 1) {
+ defaultSerializer.serializeDictionaries(
+ sortedLookup.getSortedStrings(),
+ sortedLookup.getSortedLongs(),
+ sortedLookup.getSortedDoubles(),
+ () -> new ArrayDictionaryMergingIterator(
+ sortedArrayLookups,
+ defaultSerializer.getGlobalLookup()
+ )
+ );
+ stringCardinality = sortedLookup.getStringCardinality();
+ longCardinality = sortedLookup.getLongCardinality();
+ doubleCardinality = sortedLookup.getDoubleCardinality();
+ arrayCardinality = sortedLookup.getArrayCardinality();
} else {
- throw new ISE("Unable to merge columns of unsupported adapter %s",
adapter.getClass());
+ final SimpleDictionaryMergingIterator<String> stringIterator = new
SimpleDictionaryMergingIterator<>(
+ sortedLookups,
+ STRING_MERGING_COMPARATOR
+ );
+ final SimpleDictionaryMergingIterator<Long> longIterator = new
SimpleDictionaryMergingIterator<>(
+ sortedLongLookups,
+ LONG_MERGING_COMPARATOR
+ );
+ final SimpleDictionaryMergingIterator<Double> doubleIterator = new
SimpleDictionaryMergingIterator<>(
+ sortedDoubleLookups,
+ DOUBLE_MERGING_COMPARATOR
+ );
+ final ArrayDictionaryMergingIterator arrayIterator = new
ArrayDictionaryMergingIterator(
+ sortedArrayLookups,
+ defaultSerializer.getGlobalLookup()
+ );
+ defaultSerializer.serializeDictionaries(
+ () -> stringIterator,
+ () -> longIterator,
+ () -> doubleIterator,
+ () -> arrayIterator
+ );
+ stringCardinality = stringIterator.getCardinality();
+ longCardinality = longIterator.getCardinality();
+ doubleCardinality = doubleIterator.getCardinality();
+ arrayCardinality = arrayIterator.getCardinality();
}
- boolean allNulls = allNull(dimValues.getSortedStrings()) &&
- allNull(dimValues.getSortedLongs()) &&
- allNull(dimValues.getSortedDoubles());
- sortedLookup = dimValues;
- if (!allNulls) {
- sortedLookups[i] = dimValues.getSortedStrings();
- sortedLongLookups[i] = dimValues.getSortedLongs();
- sortedDoubleLookups[i] = dimValues.getSortedDoubles();
- numMergeIndex++;
- }
- }
-
- int cardinality = 0;
- descriptorBuilder = new ColumnDescriptor.Builder();
-
- final NestedDataColumnSerializer defaultSerializer = new
NestedDataColumnSerializer(
- name,
- indexSpec,
- segmentWriteOutMedium,
- progressIndicator,
- closer
- );
- serializer = defaultSerializer;
-
- final ComplexColumnPartSerde partSerde =
ComplexColumnPartSerde.serializerBuilder()
-
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
-
.withDelegate(serializer)
- .build();
- descriptorBuilder.setValueType(ValueType.COMPLEX)
- .setHasMultipleValues(false)
- .addSerde(partSerde);
-
- defaultSerializer.open();
- defaultSerializer.serializeFields(mergedFields);
-
- if (numMergeIndex > 1) {
- SimpleDictionaryMergingIterator<String> dictionaryMergeIterator = new
SimpleDictionaryMergingIterator<>(
- sortedLookups,
- STRING_MERGING_COMPARATOR
+ log.debug(
+ "Completed dim[%s] conversions with string cardinality[%,d], long
cardinality[%,d], double cardinality[%,d], array cardinality[%,d] in %,d
millis.",
+ name,
+ stringCardinality,
+ longCardinality,
+ doubleCardinality,
+ arrayCardinality,
+ System.currentTimeMillis() - dimStartTime
);
- SimpleDictionaryMergingIterator<Long> longDictionaryMergeIterator = new
SimpleDictionaryMergingIterator<>(
- sortedLongLookups,
- LONG_MERGING_COMPARATOR
- );
- SimpleDictionaryMergingIterator<Double> doubleDictionaryMergeIterator =
new SimpleDictionaryMergingIterator<>(
- sortedDoubleLookups,
- DOUBLE_MERGING_COMPARATOR
- );
- defaultSerializer.serializeStringDictionary(() ->
dictionaryMergeIterator);
- defaultSerializer.serializeLongDictionary(() ->
longDictionaryMergeIterator);
- defaultSerializer.serializeDoubleDictionary(() ->
doubleDictionaryMergeIterator);
- cardinality = dictionaryMergeIterator.getCardinality();
- } else if (numMergeIndex == 1) {
-
defaultSerializer.serializeStringDictionary(sortedLookup.getSortedStrings());
- defaultSerializer.serializeLongDictionary(sortedLookup.getSortedLongs());
-
defaultSerializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles());
- cardinality = sortedLookup.size();
}
-
- log.debug(
- "Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
- name,
- cardinality,
- System.currentTimeMillis() - dimStartTime
- );
+ catch (Throwable ioe) {
Review Comment:
Catching `Throwable` is dangerous, why cast such a wide net?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]