imply-cheddar commented on code in PR #13803:
URL: https://github.com/apache/druid/pull/13803#discussion_r1148769259


##########
processing/src/main/java/org/apache/druid/math/expr/ExprEval.java:
##########
@@ -370,6 +370,18 @@ public static ExprEval ofComplex(ExpressionType 
outputType, @Nullable Object val
     return new ComplexExprEval(outputType, value);
   }
 
+  public static ExprEval bestEffortArray(@Nullable List<?> theList)
+  {
+    // do not convert empty lists to arrays with a single null element here, 
because that should have been done
+    // by the selectors preparing their ObjectBindings if necessary. If we get 
to this point it was legitimately
+    // empty
+    NonnullPair<ExpressionType, Object[]> coerced = coerceListToArray(theList, 
false);
+    if (coerced == null) {
+      return bestEffortOf(null);

Review Comment:
   Calling `bestEffortOf(null)` looks to me like it's going to go through like 
15 if statements, failing them all before just returning `new 
StringExprEval(null)`.  Why not just return the good thing here given that we 
already know what it should be and avoid the potential branches?



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java:
##########
@@ -426,55 +471,71 @@ public Class<?> classOfObject()
     };
   }
 
-  static class LiteralFieldIndexer
+  static class FieldIndexer
   {
     private final GlobalDimensionDictionary globalDimensionDictionary;
-    private final NestedLiteralTypeInfo.MutableTypeSet typeSet;
+    private final NestedFieldTypeInfo.MutableTypeSet typeSet;
 
-    LiteralFieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
+    FieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
     {
       this.globalDimensionDictionary = globalDimensionDictionary;
-      this.typeSet = new NestedLiteralTypeInfo.MutableTypeSet();
+      this.typeSet = new NestedFieldTypeInfo.MutableTypeSet();
     }
 
-    private StructuredDataProcessor.ProcessedLiteral<?> processValue(@Nullable 
Object value)
+    private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?> 
eval)
     {
-      // null value is always added to the global dictionary as id 0, so we 
can ignore them here
-      if (value != null) {
-        // why not
-        ExprEval<?> eval = ExprEval.bestEffortOf(value);
-        final ColumnType columnType = ExpressionType.toColumnType(eval.type());
-
-        switch (columnType.getType()) {
-          case LONG:
-            globalDimensionDictionary.addLongValue(eval.asLong());
-            typeSet.add(ColumnType.LONG);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asLong(),
-                StructuredDataProcessor.getLongObjectEstimateSize()
-            );
-          case DOUBLE:
-            globalDimensionDictionary.addDoubleValue(eval.asDouble());
-            typeSet.add(ColumnType.DOUBLE);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asDouble(),
-                StructuredDataProcessor.getDoubleObjectEstimateSize()
-            );
-          case STRING:
-          default:
-            final String asString = eval.asString();
-            globalDimensionDictionary.addStringValue(asString);
-            typeSet.add(ColumnType.STRING);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asString(),
-                StructuredDataProcessor.estimateStringSize(asString)
-            );
-        }
+      final ColumnType columnType = ExpressionType.toColumnType(eval.type());
+      int sizeEstimate;
+      switch (columnType.getType()) {
+        case LONG:
+          typeSet.add(ColumnType.LONG);
+          sizeEstimate = globalDimensionDictionary.addLongValue(eval.asLong());
+          return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(), 
sizeEstimate);
+        case DOUBLE:
+          typeSet.add(ColumnType.DOUBLE);
+          sizeEstimate = 
globalDimensionDictionary.addDoubleValue(eval.asDouble());
+          return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(), 
sizeEstimate);
+        case ARRAY:
+          // sanity check, this should never happen
+          Preconditions.checkNotNull(
+              columnType.getElementType(),
+              "Array type [%s] for value [%s] missing element type, how did 
this possibly happen?",
+              eval.type(),
+              eval.valueOrDefault()
+          );
+          switch (columnType.getElementType().getType()) {
+            case LONG:
+              typeSet.add(ColumnType.LONG_ARRAY);
+              final Object[] longArray = eval.asArray();
+              sizeEstimate = globalDimensionDictionary.addLongArray(longArray);
+              return new StructuredDataProcessor.ProcessedValue<>(longArray, 
sizeEstimate);
+            case DOUBLE:
+              typeSet.add(ColumnType.DOUBLE_ARRAY);
+              final Object[] doubleArray = eval.asArray();
+              sizeEstimate = 
globalDimensionDictionary.addDoubleArray(doubleArray);
+              return new StructuredDataProcessor.ProcessedValue<>(doubleArray, 
sizeEstimate);
+            case STRING:
+              final Object[] stringArray = eval.asArray();
+              // empty arrays and arrays with all nulls are detected as string 
arrays, but dont count them as part of
+              // the type set
+              if (stringArray.length > 0 && 
!Arrays.stream(stringArray).allMatch(Objects::isNull)) {
+                typeSet.add(ColumnType.STRING_ARRAY);
+              }
+              sizeEstimate = 
globalDimensionDictionary.addStringArray(stringArray);
+              return new StructuredDataProcessor.ProcessedValue<>(stringArray, 
sizeEstimate);
+            default:
+              throw new IAE("Unhandled type: %s", columnType);
+          }
+        case STRING:
+        default:

Review Comment:
   What's an example of when we hit the `default` case?  I'm legitimately 
asking because I do not know the answer and find myself wondering if believing 
that it's a String is really the right thing to do versus generating a parsing 
error or something of that nature.



##########
processing/src/main/java/org/apache/druid/segment/nested/NestedFieldTypeInfo.java:
##########
@@ -31,29 +32,36 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
+import java.util.Objects;
 import java.util.Set;
 
 /**
  * Binary serialization for nested field type info, translated into this 
compact format for storage in segments.
  * The index of the type info here is the same as the field index in {@link 
CompressedNestedDataComplexColumn#fields}
  */
-public class NestedLiteralTypeInfo
+public class NestedFieldTypeInfo
 {
   private static final byte STRING_MASK = 1;
   private static final byte LONG_MASK = 1 << 2;
   private static final byte DOUBLE_MASK = 1 << 3;
 
-  public static NestedLiteralTypeInfo read(ByteBuffer buffer, int length)
+  private static final byte STRING_ARRAY_MASK = 1 << 4;
+
+  private static final byte LONG_ARRAY_MASK = 1 << 5;
+
+  private static final byte DOUBLE_ARRAY_MASK = 1 << 6;

Review Comment:
   I dunno if I'm being stingy, but couldn't we have an `array` mask that is 
applied and then we check the type mask after that?  Would mean that we can add 
array support to this and only consume 1 more bit instead of consuming 3 more 
bits.



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIntArrayIndexed.java:
##########
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing int arrays, which must be sorted 
and unique, using 'front coding'.
+ *
+ * Front coding is a type of delta encoding, where sorted values are grouped 
into buckets. The first value of the bucket
+ * is written entirely, and remaining values are stored as a pair of an 
integer which indicates how much of the first
+ * int array of the bucket to use as a prefix, followed by the remaining ints 
after the prefix to complete the value.
+ *
+ * front coded indexed layout:
+ * | version | bucket size | has null? | number of values | size of "offsets" 
+ "buckets" | "offsets" | "buckets" |
+ * | ------- | ----------- | --------- | ---------------- | 
----------------------------- | --------- | --------- |
+ * |    byte |        byte |      byte |        vbyte int |                    
 vbyte int |     int[] |  bucket[] |
+ *
+ * "offsets" are the ending offsets of each bucket stored in order, stored as 
plain integers for easy random access.
+ *
+ * bucket layout:
+ * | first value | prefix length | fragment | ... | prefix length | fragment |
+ * | ----------- | ------------- | -------- | --- | ------------- | -------- |
+ * |       int[] |     vbyte int |    int[] | ... |     vbyte int |    int[] |
+ *
+ * int array layout:
+ * | length      |  ints |
+ * | ----------- | ----- |
+ * |   vbyte int | int[] |
+ *
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ *
+ * This class is not thread-safe since during operation modifies positions of 
a shared buffer.
+ */
+public final class FrontCodedIntArrayIndexed implements Indexed<int[]>
+{
+  public static Supplier<FrontCodedIntArrayIndexed> read(ByteBuffer buffer, 
ByteOrder ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = Byte.toUnsignedInt(orderedBuffer.get());
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    return () -> new FrontCodedIntArrayIndexed(
+        buffer,
+        ordering,
+        bucketSize,
+        numValues,
+        hasNull,
+        offsetsPosition
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+
+  private FrontCodedIntArrayIndexed(
+      ByteBuffer buffer,
+      ByteOrder order,
+      int bucketSize,
+      int numValues,
+      boolean hasNull,
+      int offsetsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer.asReadOnlyBuffer().order(order);
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+
+    this.numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    this.adjustIndex = hasNull ? 1 : 0;
+    this.adjustedNumValues = numValues + adjustIndex;
+    this.div = Integer.numberOfTrailingZeros(bucketSize);
+    this.rem = bucketSize - 1;
+    this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues 
& rem;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * 
Integer.BYTES);
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public int[] get(int index)
+  {
+    if (hasNull && index == 0) {
+      return null;
+    }
+    Indexed.checkIndex(index, adjustedNumValues);
+
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index

Review Comment:
   If I understood correctly, can I suggest updating the comment to be
   
   ```
   // For arrays, there is a conundrum of how to encode null different from the 
empty array.  In this code, 
   // we take the following approach:
   //   1) the 0 entry in our dictionary is assumed to be indicative of a 
completely null entry
   //   2) The 1 value in our dictionary is assumed to be the empty array
   //   3) Instead of storing the 0 value in this dictionary, we employ an 
indexing-shifting technique, where the
   //       dictionary never stores null, and starts with the empty array 
(index 1), but shifted by 1 such that what
   //       is persisted on disk is actually 0 instead of 1.
   // adjustIndex represents whether the null existed and thus whether we 
actually need to adjust the value
   ```
   



##########
processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java:
##########
@@ -81,46 +87,75 @@ public class NestedDataColumnSerializer implements 
GenericColumnSerializer<Struc
   private final StructuredDataProcessor fieldProcessor = new 
StructuredDataProcessor()
   {
     @Override
-    public StructuredDataProcessor.ProcessedLiteral<?> 
processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue)
+    public ProcessedValue<?> processField(ArrayList<NestedPathPart> fieldPath, 
@Nullable Object fieldValue)
     {
       final GlobalDictionaryEncodedFieldColumnWriter<?> writer = 
fieldWriters.get(
           NestedPathFinder.toNormalizedJsonPath(fieldPath)
       );
       if (writer != null) {
         try {
-          ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
+          final ExprEval<?> eval = ExprEval.bestEffortOf(fieldValue);
           if (eval.type().isPrimitive() || (eval.type().isArray() && 
eval.type().getElementType().isPrimitive())) {
             writer.addValue(rowCount, eval.value());
           } else {
             // behave consistently with nested column indexer, which defaults 
to string
             writer.addValue(rowCount, eval.asString());
           }
           // serializer doesn't use size estimate
-          return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL;
+          return ProcessedValue.NULL_LITERAL;
         }
         catch (IOException e) {
-          throw new RuntimeException(":(");
+          throw new RE(e, "Failed to write field [%s] value [%s]", fieldPath, 
fieldValue);
         }
       }
-      return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL;
+      return ProcessedValue.NULL_LITERAL;
+    }
+
+    @Nullable
+    @Override
+    public ProcessedValue<?> processArrayField(
+        ArrayList<NestedPathPart> fieldPath,
+        @Nullable List<?> array
+    )
+    {
+      final ExprEval<?> eval = ExprEval.bestEffortArray(array);
+      if (eval.type().isArray() && eval.type().getElementType().isPrimitive()) 
{
+        final GlobalDictionaryEncodedFieldColumnWriter<?> writer = 
fieldWriters.get(
+            NestedPathFinder.toNormalizedJsonPath(fieldPath)
+        );
+        if (writer != null) {
+          try {
+            writer.addValue(rowCount, eval.value());
+            // serializer doesn't use size estimate
+            return ProcessedValue.NULL_LITERAL;
+          }
+          catch (IOException e) {
+            throw new RE(e, "Failed to write field [%s] value [%s]", 
fieldPath, array);

Review Comment:
   This exception leaks the content of the data if it ever gets built.  I think 
the "best" we can do here is mention which field it was trying to write.



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -234,7 +272,9 @@ private GlobalDictionarySortedCollector 
getSortedIndexFromV1QueryableAdapterNest
     return new GlobalDictionarySortedCollector(
         new 
StringEncodingStrategies.Utf8ToStringIndexed(column.getStringDictionary()),
         column.getLongDictionary(),
-        column.getDoubleDictionary()
+        column.getDoubleDictionary(),
+        column.getArraysIterable(),
+        column.getArrayDictionary().size()

Review Comment:
   Sure, doesn't have to be an `Indexed`, just also seemed weird to be passing 
in fully encapsulated objects above and then suddenly start passing in 2 
arguments to wrap a new thing that seems similar to the ones above...



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java:
##########
@@ -426,55 +471,71 @@ public Class<?> classOfObject()
     };
   }
 
-  static class LiteralFieldIndexer
+  static class FieldIndexer
   {
     private final GlobalDimensionDictionary globalDimensionDictionary;
-    private final NestedLiteralTypeInfo.MutableTypeSet typeSet;
+    private final NestedFieldTypeInfo.MutableTypeSet typeSet;
 
-    LiteralFieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
+    FieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
     {
       this.globalDimensionDictionary = globalDimensionDictionary;
-      this.typeSet = new NestedLiteralTypeInfo.MutableTypeSet();
+      this.typeSet = new NestedFieldTypeInfo.MutableTypeSet();
     }
 
-    private StructuredDataProcessor.ProcessedLiteral<?> processValue(@Nullable 
Object value)
+    private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?> 
eval)
     {
-      // null value is always added to the global dictionary as id 0, so we 
can ignore them here
-      if (value != null) {
-        // why not
-        ExprEval<?> eval = ExprEval.bestEffortOf(value);
-        final ColumnType columnType = ExpressionType.toColumnType(eval.type());
-
-        switch (columnType.getType()) {
-          case LONG:
-            globalDimensionDictionary.addLongValue(eval.asLong());
-            typeSet.add(ColumnType.LONG);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asLong(),
-                StructuredDataProcessor.getLongObjectEstimateSize()
-            );
-          case DOUBLE:
-            globalDimensionDictionary.addDoubleValue(eval.asDouble());
-            typeSet.add(ColumnType.DOUBLE);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asDouble(),
-                StructuredDataProcessor.getDoubleObjectEstimateSize()
-            );
-          case STRING:
-          default:
-            final String asString = eval.asString();
-            globalDimensionDictionary.addStringValue(asString);
-            typeSet.add(ColumnType.STRING);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asString(),
-                StructuredDataProcessor.estimateStringSize(asString)
-            );
-        }
+      final ColumnType columnType = ExpressionType.toColumnType(eval.type());
+      int sizeEstimate;
+      switch (columnType.getType()) {
+        case LONG:
+          typeSet.add(ColumnType.LONG);
+          sizeEstimate = globalDimensionDictionary.addLongValue(eval.asLong());
+          return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(), 
sizeEstimate);
+        case DOUBLE:
+          typeSet.add(ColumnType.DOUBLE);
+          sizeEstimate = 
globalDimensionDictionary.addDoubleValue(eval.asDouble());
+          return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(), 
sizeEstimate);
+        case ARRAY:
+          // sanity check, this should never happen
+          Preconditions.checkNotNull(
+              columnType.getElementType(),
+              "Array type [%s] for value [%s] missing element type, how did 
this possibly happen?",
+              eval.type(),
+              eval.valueOrDefault()
+          );

Review Comment:
   Nit: I think this would be better as an `if` statement.  If this ever gets 
thrown out, someone is gonna want to attach a debugger to the point that this 
gets thrown from and they are gonna need to convert it to an if statement to be 
able to do that without setting some conditions and stuff on their debug 
breakpoint.



##########
sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java:
##########
@@ -190,6 +190,7 @@ public static void setupNullValues()
 
   public static final Map<String, Object> QUERY_CONTEXT_NO_STRINGIFY_ARRAY =
       
DEFAULT_QUERY_CONTEXT_BUILDER.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
+                                   .put(PlannerContext.CTX_ENABLE_UNNEST, true)

Review Comment:
   Not sure how much I care about this, but I expected this to be put on the 
Unnest/Array tests instead of across all of the tests in this base class.



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnIndexer.java:
##########
@@ -426,55 +471,71 @@ public Class<?> classOfObject()
     };
   }
 
-  static class LiteralFieldIndexer
+  static class FieldIndexer
   {
     private final GlobalDimensionDictionary globalDimensionDictionary;
-    private final NestedLiteralTypeInfo.MutableTypeSet typeSet;
+    private final NestedFieldTypeInfo.MutableTypeSet typeSet;
 
-    LiteralFieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
+    FieldIndexer(GlobalDimensionDictionary globalDimensionDictionary)
     {
       this.globalDimensionDictionary = globalDimensionDictionary;
-      this.typeSet = new NestedLiteralTypeInfo.MutableTypeSet();
+      this.typeSet = new NestedFieldTypeInfo.MutableTypeSet();
     }
 
-    private StructuredDataProcessor.ProcessedLiteral<?> processValue(@Nullable 
Object value)
+    private StructuredDataProcessor.ProcessedValue<?> processValue(ExprEval<?> 
eval)
     {
-      // null value is always added to the global dictionary as id 0, so we 
can ignore them here
-      if (value != null) {
-        // why not
-        ExprEval<?> eval = ExprEval.bestEffortOf(value);
-        final ColumnType columnType = ExpressionType.toColumnType(eval.type());
-
-        switch (columnType.getType()) {
-          case LONG:
-            globalDimensionDictionary.addLongValue(eval.asLong());
-            typeSet.add(ColumnType.LONG);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asLong(),
-                StructuredDataProcessor.getLongObjectEstimateSize()
-            );
-          case DOUBLE:
-            globalDimensionDictionary.addDoubleValue(eval.asDouble());
-            typeSet.add(ColumnType.DOUBLE);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asDouble(),
-                StructuredDataProcessor.getDoubleObjectEstimateSize()
-            );
-          case STRING:
-          default:
-            final String asString = eval.asString();
-            globalDimensionDictionary.addStringValue(asString);
-            typeSet.add(ColumnType.STRING);
-            return new StructuredDataProcessor.ProcessedLiteral<>(
-                eval.asString(),
-                StructuredDataProcessor.estimateStringSize(asString)
-            );
-        }
+      final ColumnType columnType = ExpressionType.toColumnType(eval.type());
+      int sizeEstimate;
+      switch (columnType.getType()) {
+        case LONG:
+          typeSet.add(ColumnType.LONG);
+          sizeEstimate = globalDimensionDictionary.addLongValue(eval.asLong());
+          return new StructuredDataProcessor.ProcessedValue<>(eval.asLong(), 
sizeEstimate);
+        case DOUBLE:
+          typeSet.add(ColumnType.DOUBLE);
+          sizeEstimate = 
globalDimensionDictionary.addDoubleValue(eval.asDouble());
+          return new StructuredDataProcessor.ProcessedValue<>(eval.asDouble(), 
sizeEstimate);
+        case ARRAY:
+          // sanity check, this should never happen
+          Preconditions.checkNotNull(
+              columnType.getElementType(),
+              "Array type [%s] for value [%s] missing element type, how did 
this possibly happen?",
+              eval.type(),
+              eval.valueOrDefault()
+          );
+          switch (columnType.getElementType().getType()) {
+            case LONG:
+              typeSet.add(ColumnType.LONG_ARRAY);
+              final Object[] longArray = eval.asArray();
+              sizeEstimate = globalDimensionDictionary.addLongArray(longArray);
+              return new StructuredDataProcessor.ProcessedValue<>(longArray, 
sizeEstimate);
+            case DOUBLE:
+              typeSet.add(ColumnType.DOUBLE_ARRAY);
+              final Object[] doubleArray = eval.asArray();
+              sizeEstimate = 
globalDimensionDictionary.addDoubleArray(doubleArray);
+              return new StructuredDataProcessor.ProcessedValue<>(doubleArray, 
sizeEstimate);
+            case STRING:
+              final Object[] stringArray = eval.asArray();
+              // empty arrays and arrays with all nulls are detected as string 
arrays, but dont count them as part of
+              // the type set
+              if (stringArray.length > 0 && 
!Arrays.stream(stringArray).allMatch(Objects::isNull)) {
+                typeSet.add(ColumnType.STRING_ARRAY);
+              }
+              sizeEstimate = 
globalDimensionDictionary.addStringArray(stringArray);
+              return new StructuredDataProcessor.ProcessedValue<>(stringArray, 
sizeEstimate);
+            default:
+              throw new IAE("Unhandled type: %s", columnType);

Review Comment:
   Just double checking, is this where we expect an array-of-arrays or an 
array-of-objects to end up?



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -86,99 +94,129 @@ public NestedDataColumnMerger(
   @Override
   public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
   {
+    try {
+      long dimStartTime = System.currentTimeMillis();
+
+      int numMergeIndex = 0;
+      GlobalDictionarySortedCollector sortedLookup = null;
+      final Indexed[] sortedLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+      final Iterable<Object[]>[] sortedArrayLookups = new 
Iterable[adapters.size()];
+
+      final SortedMap<String, NestedFieldTypeInfo.MutableTypeSet> mergedFields 
= new TreeMap<>();
+
+      for (int i = 0; i < adapters.size(); i++) {
+        final IndexableAdapter adapter = adapters.get(i);
+        final GlobalDictionarySortedCollector dimValues;
+        if (adapter instanceof IncrementalIndexAdapter) {
+          dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
+        } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+          dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+        } else {
+          throw new ISE("Unable to merge columns of unsupported adapter [%s]", 
adapter.getClass());
+        }
 
-    long dimStartTime = System.currentTimeMillis();
-
-    int numMergeIndex = 0;
-    GlobalDictionarySortedCollector sortedLookup = null;
-    final Indexed[] sortedLookups = new Indexed[adapters.size()];
-    final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
-    final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+        boolean allNulls = dimValues == null || dimValues.allNull();
+        sortedLookup = dimValues;
+        if (!allNulls) {
+          sortedLookups[i] = dimValues.getSortedStrings();
+          sortedLongLookups[i] = dimValues.getSortedLongs();
+          sortedDoubleLookups[i] = dimValues.getSortedDoubles();
+          sortedArrayLookups[i] = dimValues.getSortedArrays();
+          numMergeIndex++;
+        }
+      }
 
-    final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields 
= new TreeMap<>();
+      descriptorBuilder = new ColumnDescriptor.Builder();
 
-    for (int i = 0; i < adapters.size(); i++) {
-      final IndexableAdapter adapter = adapters.get(i);
-      final GlobalDictionarySortedCollector dimValues;
-      if (adapter instanceof IncrementalIndexAdapter) {
-        dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
-      } else if (adapter instanceof QueryableIndexIndexableAdapter) {
-        dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+      final NestedDataColumnSerializer defaultSerializer = new 
NestedDataColumnSerializer(
+          name,
+          indexSpec,
+          segmentWriteOutMedium,
+          progressIndicator,
+          closer
+      );
+      serializer = defaultSerializer;
+
+      final ComplexColumnPartSerde partSerde = 
ComplexColumnPartSerde.serializerBuilder()
+                                                                     
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
+                                                                     
.withDelegate(serializer)
+                                                                     .build();
+      descriptorBuilder.setValueType(ValueType.COMPLEX)
+                       .setHasMultipleValues(false)
+                       .addSerde(partSerde);
+
+      defaultSerializer.open();
+      defaultSerializer.serializeFields(mergedFields);
+
+      int stringCardinality;
+      int longCardinality;
+      int doubleCardinality;
+      int arrayCardinality;
+      if (numMergeIndex == 1) {
+        defaultSerializer.serializeDictionaries(
+            sortedLookup.getSortedStrings(),
+            sortedLookup.getSortedLongs(),
+            sortedLookup.getSortedDoubles(),
+            () -> new ArrayDictionaryMergingIterator(
+                sortedArrayLookups,
+                defaultSerializer.getGlobalLookup()
+            )
+        );
+        stringCardinality = sortedLookup.getStringCardinality();
+        longCardinality = sortedLookup.getLongCardinality();
+        doubleCardinality = sortedLookup.getDoubleCardinality();
+        arrayCardinality = sortedLookup.getArrayCardinality();
       } else {
-        throw new ISE("Unable to merge columns of unsupported adapter %s", 
adapter.getClass());
+        final SimpleDictionaryMergingIterator<String> stringIterator = new 
SimpleDictionaryMergingIterator<>(
+            sortedLookups,
+            STRING_MERGING_COMPARATOR
+        );
+        final SimpleDictionaryMergingIterator<Long> longIterator = new 
SimpleDictionaryMergingIterator<>(
+            sortedLongLookups,
+            LONG_MERGING_COMPARATOR
+        );
+        final SimpleDictionaryMergingIterator<Double> doubleIterator = new 
SimpleDictionaryMergingIterator<>(
+            sortedDoubleLookups,
+            DOUBLE_MERGING_COMPARATOR
+        );
+        final ArrayDictionaryMergingIterator arrayIterator = new 
ArrayDictionaryMergingIterator(
+            sortedArrayLookups,
+            defaultSerializer.getGlobalLookup()
+        );
+        defaultSerializer.serializeDictionaries(
+            () -> stringIterator,
+            () -> longIterator,
+            () -> doubleIterator,
+            () -> arrayIterator
+        );
+        stringCardinality = stringIterator.getCardinality();
+        longCardinality = longIterator.getCardinality();
+        doubleCardinality = doubleIterator.getCardinality();
+        arrayCardinality = arrayIterator.getCardinality();
       }
 
-      boolean allNulls = allNull(dimValues.getSortedStrings()) &&
-                         allNull(dimValues.getSortedLongs()) &&
-                         allNull(dimValues.getSortedDoubles());
-      sortedLookup = dimValues;
-      if (!allNulls) {
-        sortedLookups[i] = dimValues.getSortedStrings();
-        sortedLongLookups[i] = dimValues.getSortedLongs();
-        sortedDoubleLookups[i] = dimValues.getSortedDoubles();
-        numMergeIndex++;
-      }
-    }
-
-    int cardinality = 0;
-    descriptorBuilder = new ColumnDescriptor.Builder();
-
-    final NestedDataColumnSerializer defaultSerializer = new 
NestedDataColumnSerializer(
-        name,
-        indexSpec,
-        segmentWriteOutMedium,
-        progressIndicator,
-        closer
-    );
-    serializer = defaultSerializer;
-
-    final ComplexColumnPartSerde partSerde = 
ComplexColumnPartSerde.serializerBuilder()
-                                                                   
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
-                                                                   
.withDelegate(serializer)
-                                                                   .build();
-    descriptorBuilder.setValueType(ValueType.COMPLEX)
-                     .setHasMultipleValues(false)
-                     .addSerde(partSerde);
-
-    defaultSerializer.open();
-    defaultSerializer.serializeFields(mergedFields);
-
-    if (numMergeIndex > 1) {
-      SimpleDictionaryMergingIterator<String> dictionaryMergeIterator = new 
SimpleDictionaryMergingIterator<>(
-          sortedLookups,
-          STRING_MERGING_COMPARATOR
+      log.debug(
+          "Completed dim[%s] conversions with string cardinality[%,d], long 
cardinality[%,d], double cardinality[%,d], array cardinality[%,d] in %,d 
millis.",
+          name,
+          stringCardinality,
+          longCardinality,
+          doubleCardinality,
+          arrayCardinality,
+          System.currentTimeMillis() - dimStartTime
       );
-      SimpleDictionaryMergingIterator<Long> longDictionaryMergeIterator = new 
SimpleDictionaryMergingIterator<>(
-          sortedLongLookups,
-          LONG_MERGING_COMPARATOR
-      );
-      SimpleDictionaryMergingIterator<Double> doubleDictionaryMergeIterator = 
new SimpleDictionaryMergingIterator<>(
-          sortedDoubleLookups,
-          DOUBLE_MERGING_COMPARATOR
-      );
-      defaultSerializer.serializeStringDictionary(() -> 
dictionaryMergeIterator);
-      defaultSerializer.serializeLongDictionary(() -> 
longDictionaryMergeIterator);
-      defaultSerializer.serializeDoubleDictionary(() -> 
doubleDictionaryMergeIterator);
-      cardinality = dictionaryMergeIterator.getCardinality();
-    } else if (numMergeIndex == 1) {
-      
defaultSerializer.serializeStringDictionary(sortedLookup.getSortedStrings());
-      defaultSerializer.serializeLongDictionary(sortedLookup.getSortedLongs());
-      
defaultSerializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles());
-      cardinality = sortedLookup.size();
     }
-
-    log.debug(
-        "Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
-        name,
-        cardinality,
-        System.currentTimeMillis() - dimStartTime
-    );
+    catch (Throwable ioe) {

Review Comment:
   Catching `Throwable` is dangerous, why cast such a wide net?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to