imply-cheddar commented on code in PR #13803:
URL: https://github.com/apache/druid/pull/13803#discussion_r1141568494


##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -86,93 +94,123 @@ public NestedDataColumnMerger(
   @Override
   public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
   {
+    try {
+      long dimStartTime = System.currentTimeMillis();
+
+      int numMergeIndex = 0;
+      GlobalDictionarySortedCollector sortedLookup = null;
+      final Indexed[] sortedLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+      final Iterable<Object[]>[] sortedArrayLookups = new 
Iterable[adapters.size()];
+
+      final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> 
mergedFields = new TreeMap<>();
+
+      for (int i = 0; i < adapters.size(); i++) {
+        final IndexableAdapter adapter = adapters.get(i);
+        final GlobalDictionarySortedCollector dimValues;
+        if (adapter instanceof IncrementalIndexAdapter) {
+          dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
+        } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+          dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+        } else {
+          throw new ISE("Unable to merge columns of unsupported adapter %s", 
adapter.getClass());

Review Comment:
   `[]` pls



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -86,93 +94,123 @@ public NestedDataColumnMerger(
   @Override
   public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
   {
+    try {
+      long dimStartTime = System.currentTimeMillis();
+
+      int numMergeIndex = 0;
+      GlobalDictionarySortedCollector sortedLookup = null;
+      final Indexed[] sortedLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+      final Iterable<Object[]>[] sortedArrayLookups = new 
Iterable[adapters.size()];
+
+      final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> 
mergedFields = new TreeMap<>();
+
+      for (int i = 0; i < adapters.size(); i++) {
+        final IndexableAdapter adapter = adapters.get(i);
+        final GlobalDictionarySortedCollector dimValues;
+        if (adapter instanceof IncrementalIndexAdapter) {
+          dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
+        } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+          dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+        } else {
+          throw new ISE("Unable to merge columns of unsupported adapter %s", 
adapter.getClass());
+        }
 
-    long dimStartTime = System.currentTimeMillis();
-
-    int numMergeIndex = 0;
-    GlobalDictionarySortedCollector sortedLookup = null;
-    final Indexed[] sortedLookups = new Indexed[adapters.size()];
-    final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
-    final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+        boolean allNulls = dimValues == null || 
allNull(dimValues.getSortedStrings()) &&
+                                                
allNull(dimValues.getSortedLongs()) &&
+                                                
allNull(dimValues.getSortedDoubles()) &&
+                                                
dimValues.getArrayCardinality() == 0;

Review Comment:
   This seems like a nice check to delegate to 
`GlobalDictionarySortedCollector` instead of implementing here?



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -234,7 +272,9 @@ private GlobalDictionarySortedCollector 
getSortedIndexFromV1QueryableAdapterNest
     return new GlobalDictionarySortedCollector(
         new 
StringEncodingStrategies.Utf8ToStringIndexed(column.getStringDictionary()),
         column.getLongDictionary(),
-        column.getDoubleDictionary()
+        column.getDoubleDictionary(),
+        column.getArraysIterable(),
+        column.getArrayDictionary().size()

Review Comment:
   Why the 2 argument set of `Iterable()` and `size()` instead of a single 
collection-style object like the others?



##########
processing/src/main/java/org/apache/druid/segment/data/FrontCodedIntArrayIndexed.java:
##########
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * {@link Indexed} specialized for storing int arrays, which must be sorted 
and unique, using 'front coding'.
+ *
+ * Front coding is a type of delta encoding, where sorted values are grouped 
into buckets. The first value of the bucket
+ * is written entirely, and remaining values are stored as a pair of an 
integer which indicates how much of the first
+ * int array of the bucket to use as a prefix, followed by the remaining ints 
after the prefix to complete the value.
+ *
+ * front coded indexed layout:
+ * | version | bucket size | has null? | number of values | size of "offsets" 
+ "buckets" | "offsets" | "buckets" |
+ * | ------- | ----------- | --------- | ---------------- | 
----------------------------- | --------- | --------- |
+ * |    byte |        byte |      byte |        vbyte int |                    
 vbyte int |     int[] |  bucket[] |
+ *
+ * "offsets" are the ending offsets of each bucket stored in order, stored as 
plain integers for easy random access.
+ *
+ * bucket layout:
+ * | first value | prefix length | fragment | ... | prefix length | fragment |
+ * | ----------- | ------------- | -------- | --- | ------------- | -------- |
+ * |       int[] |     vbyte int |    int[] | ... |     vbyte int |    int[] |
+ *
+ * int array layout:
+ * | length      |  ints |
+ * | ----------- | ----- |
+ * |   vbyte int | int[] |
+ *
+ *
+ * Getting a value first picks the appropriate bucket, finds its offset in the 
underlying buffer, then scans the bucket
+ * values to seek to the correct position of the value within the bucket in 
order to reconstruct it using the prefix
+ * length.
+ *
+ * Finding the index of a value involves binary searching the first values of 
each bucket to find the correct bucket,
+ * then a linear scan within the bucket to find the matching value (or 
negative insertion point -1 for values that
+ * are not present).
+ *
+ * The value iterator reads an entire bucket at a time, reconstructing the 
values into an array to iterate within the
+ * bucket before moving onto the next bucket as the iterator is consumed.
+ *
+ * This class is not thread-safe since during operation modifies positions of 
a shared buffer.
+ */
+public final class FrontCodedIntArrayIndexed implements Indexed<int[]>
+{
+  public static Supplier<FrontCodedIntArrayIndexed> read(ByteBuffer buffer, 
ByteOrder ordering)
+  {
+    final ByteBuffer orderedBuffer = buffer.asReadOnlyBuffer().order(ordering);
+    final byte version = orderedBuffer.get();
+    Preconditions.checkArgument(version == 0, "only V0 exists, encountered " + 
version);
+    final int bucketSize = Byte.toUnsignedInt(orderedBuffer.get());
+    final boolean hasNull = NullHandling.IS_NULL_BYTE == orderedBuffer.get();
+    final int numValues = VByte.readInt(orderedBuffer);
+    // size of offsets + values
+    final int size = VByte.readInt(orderedBuffer);
+    final int offsetsPosition = orderedBuffer.position();
+    // move position to end of buffer
+    buffer.position(offsetsPosition + size);
+
+    return () -> new FrontCodedIntArrayIndexed(
+        buffer,
+        ordering,
+        bucketSize,
+        numValues,
+        hasNull,
+        offsetsPosition
+    );
+  }
+
+  private final ByteBuffer buffer;
+  private final int adjustedNumValues;
+  private final int adjustIndex;
+  private final int bucketSize;
+  private final int numBuckets;
+  private final int div;
+  private final int rem;
+  private final int offsetsPosition;
+  private final int bucketsPosition;
+  private final boolean hasNull;
+  private final int lastBucketNumValues;
+
+  private FrontCodedIntArrayIndexed(
+      ByteBuffer buffer,
+      ByteOrder order,
+      int bucketSize,
+      int numValues,
+      boolean hasNull,
+      int offsetsPosition
+  )
+  {
+    if (Integer.bitCount(bucketSize) != 1) {
+      throw new ISE("bucketSize must be a power of two but was[%,d]", 
bucketSize);
+    }
+    this.buffer = buffer.asReadOnlyBuffer().order(order);
+    this.bucketSize = bucketSize;
+    this.hasNull = hasNull;
+
+    this.numBuckets = (int) Math.ceil((double) numValues / (double) 
bucketSize);
+    this.adjustIndex = hasNull ? 1 : 0;
+    this.adjustedNumValues = numValues + adjustIndex;
+    this.div = Integer.numberOfTrailingZeros(bucketSize);
+    this.rem = bucketSize - 1;
+    this.lastBucketNumValues = (numValues & rem) == 0 ? bucketSize : numValues 
& rem;
+    this.offsetsPosition = offsetsPosition;
+    this.bucketsPosition = offsetsPosition + ((numBuckets - 1) * 
Integer.BYTES);
+  }
+
+  @Override
+  public int size()
+  {
+    return adjustedNumValues;
+  }
+
+  @Nullable
+  @Override
+  public int[] get(int index)
+  {
+    if (hasNull && index == 0) {
+      return null;
+    }
+    Indexed.checkIndex(index, adjustedNumValues);
+
+    // due to vbyte encoding, the null value is not actually stored in the 
bucket (no negative values), so we adjust
+    // the index

Review Comment:
   isn't `null` == `0`?  Why does `no negative values` mean that `null` is not 
stored in the bucket?



##########
processing/src/main/java/org/apache/druid/segment/nested/ArrayOfLiteralsFieldColumnWriter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class ArrayOfLiteralsFieldColumnWriter extends 
GlobalDictionaryEncodedFieldColumnWriter<int[]>
+{
+
+  protected ArrayOfLiteralsFieldColumnWriter(
+      String columnName,
+      String fieldName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      IndexSpec indexSpec,
+      GlobalDictionaryIdLookup globalDictionaryIdLookup
+  )
+  {
+    super(columnName, fieldName, segmentWriteOutMedium, indexSpec, 
globalDictionaryIdLookup);
+  }
+
+  @Override
+  int[] processValue(int row, Object value)
+  {
+    if (value instanceof Object[]) {

Review Comment:
   Would it be good to have a thing that handles `List<>` as well?



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -327,17 +390,58 @@ public void close()
   }
 
   @Override
-  public DimensionSelector makeDimensionSelector(List<NestedPathPart> path, 
ReadableOffset readableOffset, ExtractionFn fn)
+  public DimensionSelector makeDimensionSelector(
+      List<NestedPathPart> path,
+      ReadableOffset readableOffset,
+      ExtractionFn fn
+  )
   {
     final String field = getField(path);
     Preconditions.checkNotNull(field, "Null field");
 
     if (fields.indexOf(field) >= 0) {
       DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(field).getColumn();
       return col.makeDimensionSelector(readableOffset, fn);
-    } else {
-      return DimensionSelector.constant(null);
     }
+    if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+      final NestedPathPart lastPath = path.get(path.size() - 1);
+      final String arrayField = getField(path.subList(0, path.size() - 1));
+      if (fields.indexOf(arrayField) >= 0) {
+        final int elementNumber = ((NestedPathArrayElement) 
lastPath).getIndex();
+        if (elementNumber < 0) {
+          throw new IAE("Cannot make array element selector for path [%s], 
negative array index not supported for this selector", path);

Review Comment:
   Would be nice to include the elementNumber that was asked for here.



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -394,11 +617,127 @@ public VectorValueSelector 
makeVectorValueSelector(List<NestedPathPart> path, Re
     if (fields.indexOf(field) >= 0) {
       BaseColumn col = getColumnHolder(field).getColumn();
       return col.makeVectorValueSelector(readableOffset);
-    } else {
-      return NilVectorSelector.create(readableOffset);
     }
-  }
+    if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+      final NestedPathPart lastPath = path.get(path.size() - 1);
+      final String arrayField = getField(path.subList(0, path.size() - 1));
+      if (fields.indexOf(arrayField) >= 0) {
+        final int elementNumber = ((NestedPathArrayElement) 
lastPath).getIndex();
+        if (elementNumber < 0) {
+          throw new IAE("Cannot make array element selector for path [%s], 
negative array index not supported for this selector", path);
+        }
+        DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(arrayField).getColumn();
+        VectorObjectSelector arraySelector = 
col.makeVectorObjectSelector(readableOffset);

Review Comment:
   After seeing this same pre-amble for the 4th time now in the code, I think 
this code would benefit from doing this pre-amble to make an 
`ArrayBasedSelectorThingieFactory`, which could then have factory methods for 
each of the object types (i.e. `makeDimensionSelector()`, 
`makeColumnValueSelector`, `makeVectoryObjectSelector`, 
`makeVectorValueSelector`).  This would allow you to have a single method that 
does this pre-amble and then use that result to return the thing being asked 
for.



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -380,9 +551,61 @@ public VectorObjectSelector 
makeVectorObjectSelector(List<NestedPathPart> path,
     if (fields.indexOf(field) >= 0) {
       BaseColumn col = getColumnHolder(field).getColumn();
       return col.makeVectorObjectSelector(readableOffset);
-    } else {
-      return NilVectorSelector.create(readableOffset);
     }
+    if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+      final NestedPathPart lastPath = path.get(path.size() - 1);
+      final String arrayField = getField(path.subList(0, path.size() - 1));
+      if (fields.indexOf(arrayField) >= 0) {
+        final int elementNumber = ((NestedPathArrayElement) 
lastPath).getIndex();
+        if (elementNumber < 0) {
+          throw new IAE("Cannot make array element selector for path [%s], 
negative array index not supported for this selector", path);
+        }
+        DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(arrayField).getColumn();
+        VectorObjectSelector arraySelector = 
col.makeVectorObjectSelector(readableOffset);
+
+        return new VectorObjectSelector()
+        {
+          private final Object[] elements = new 
Object[arraySelector.getMaxVectorSize()];
+          private int id = ReadableVectorInspector.NULL_ID;
+
+          @Override
+          public Object[] getObjectVector()
+          {
+            if (readableOffset.getId() != id) {
+              final Object[] delegate = arraySelector.getObjectVector();
+              for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) {

Review Comment:
   If I'm reading this correctly, we are loading up a full `Object[]` only to 
then throw away everything except the `elementNumber` that we want.  That's 
quite wasteful, especially given that we know the element that we care about 
when we are constructing the objects that will do the read.  Let's push down 
the element number all the way to the thing that's doing the read and only read 
the minimum amount of stuff into memory.



##########
processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java:
##########
@@ -104,6 +108,32 @@ public StructuredDataProcessor.ProcessedLiteral<?> 
processLiteralField(ArrayList
       }
       return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL;
     }
+
+    @Nullable
+    @Override
+    public ProcessedLiteral<?> processArrayOfLiteralsField(
+        ArrayList<NestedPathPart> fieldPath,
+        @Nullable Object maybeArrayOfLiterals
+    )
+    {
+      ExprEval<?> eval = ExprEval.bestEffortOf(maybeArrayOfLiterals);
+      if (eval.type().isArray() && eval.type().getElementType().isPrimitive()) 
{
+        final GlobalDictionaryEncodedFieldColumnWriter<?> writer = 
fieldWriters.get(
+            NestedPathFinder.toNormalizedJsonPath(fieldPath)
+        );
+        if (writer != null) {
+          try {
+            writer.addValue(rowCount, eval.value());
+            // serializer doesn't use size estimate
+            return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL;
+          }
+          catch (IOException e) {
+            throw new RuntimeException(":(");

Review Comment:
   This is not only throwing away the IOException, it's not even attempting to 
generate a nice message.  `:(` :P



##########
processing/src/test/resources/nested-array-test-data.json:
##########
@@ -0,0 +1,14 @@
+{"timestamp": "2023-01-01T00:00:00", "arrayString": ["a", "b"],       
"arrayStringNulls": ["a", "b"],         "arrayLong":[1, 2, 3],    
"arrayLongNulls":[1, null,3],   "arrayDouble":[1.1, 2.2, 3.3],  
"arrayDoubleNulls":[1.1, 2.2, null],  "arrayVariant":["a", 1, 2.2],     
"arrayObject":[{"x": 1},{"x":2}]}
+{"timestamp": "2023-01-01T00:00:00", "arrayString": ["a", "b", "c"],  
"arrayStringNulls": [null, "b"],        "arrayLong":[2, 3],                     
                  "arrayDouble":[3.3, 4.4, 5.5],  "arrayDoubleNulls":[999, 
null, 5.5],  "arrayVariant":[null, null, 2.2], "arrayObject":[{"x": 3},{"x":4}]}
+{"timestamp": "2023-01-01T00:00:00", "arrayString": ["b", "c"],       
"arrayStringNulls": ["d", null, "b"],   "arrayLong":[1, 2, 3, 4], 
"arrayLongNulls":[1, 2, 3],     "arrayDouble":[1.1, 3.3],       
"arrayDoubleNulls":[null, 2.2, null], "arrayVariant":[1, null, 1],      
"arrayObject":[null,{"x":2}]}
+{"timestamp": "2023-01-01T00:00:00", "arrayString": ["d", "e"],       
"arrayStringNulls": ["b", "b"],         "arrayLong":[1, 4],       
"arrayLongNulls":[1],           "arrayDouble":[2.2, 3.3, 4.0],                  
                      "arrayVariant":["a", "b", "c"],   "arrayObject":[{"x": 
null},{"x":2}]}
+{"timestamp": "2023-01-01T00:00:00", "arrayString": null,                      
                               "arrayLong":[1, 2, 3],    "arrayLongNulls":null, 
         "arrayDouble":[1.1, 2.2, 3.3],  "arrayDoubleNulls":null,               
                                 "arrayObject":[{"x": 1000},{"y":2000}]}
+{"timestamp": "2023-01-01T00:00:00", "arrayString": ["a", "b"],       
"arrayStringNulls": null,                                         
"arrayLongNulls":[null, 2, 9],  "arrayDouble":null,             
"arrayDoubleNulls":[999, 5.5, null],  "arrayVariant":["a", 1, 2.2],     
"arrayObject":[{"a": 1},{"b":2}]}
+{"timestamp": "2023-01-01T00:00:00",                                  
"arrayStringNulls": ["a", "b"],         "arrayLong":null,         
"arrayLongNulls":[2, 3],                                        
"arrayDoubleNulls":[null, 1.1],       "arrayVariant":null,              
"arrayObject":[{"x": 1},{"x":2}]}
+{"timestamp": "2023-01-02T00:00:00", "arrayString": ["a", "b"],       
"arrayStringNulls": ["a", "b"],         "arrayLong":[1, 2, 3],    
"arrayLongNulls":[1, null,3],   "arrayDouble":[1.1, 2.2, 3.3],  
"arrayDoubleNulls":[1.1, 2.2, null],  "arrayVariant":["a", 1, 2.2],     
"arrayObject":[{"x": 1},{"x":2}]}
+{"timestamp": "2023-01-02T00:00:00", "arrayString": ["a", "b", "c"],  
"arrayStringNulls": [null, "b"],        "arrayLong":[2, 3],                     
                  "arrayDouble":[3.3, 4.4, 5.5],  "arrayDoubleNulls":[999, 
null, 5.5],  "arrayVariant":[null, null, 2.2], "arrayObject":[{"x": 3},{"x":4}]}
+{"timestamp": "2023-01-02T00:00:00", "arrayString": ["b", "c"],       
"arrayStringNulls": ["d", null, "b"],   "arrayLong":[1, 2, 3, 4], 
"arrayLongNulls":[1, 2, 3],     "arrayDouble":[1.1, 3.3],       
"arrayDoubleNulls":[null, 2.2, null], "arrayVariant":[1, null, 1],      
"arrayObject":[null,{"x":2}]}
+{"timestamp": "2023-01-02T00:00:00", "arrayString": ["d", "e"],       
"arrayStringNulls": ["b", "b"],         "arrayLong":[1, 4],       
"arrayLongNulls":[1],           "arrayDouble":[2.2, 3.3, 4.0],                  
                      "arrayVariant":["a", "b", "c"],   "arrayObject":[{"x": 
null},{"x":2}]}
+{"timestamp": "2023-01-02T00:00:00", "arrayString": null,                      
                               "arrayLong":[1, 2, 3],    "arrayLongNulls":null, 
         "arrayDouble":[1.1, 2.2, 3.3],  "arrayDoubleNulls":null,               
                                 "arrayObject":[{"x": 1000},{"y":2000}]}
+{"timestamp": "2023-01-02T00:00:00", "arrayString": ["a", "b"],       
"arrayStringNulls": null,                                         
"arrayLongNulls":[null, 2, 9],  "arrayDouble":null,             
"arrayDoubleNulls":[999, 5.5, null],  "arrayVariant":["a", 1, 2.2],     
"arrayObject":[{"a": 1},{"b":2}]}
+{"timestamp": "2023-01-02T00:00:00",                                  
"arrayStringNulls": ["a", "b"],         "arrayLong":null,         
"arrayLongNulls":[2, 3],                                        
"arrayDoubleNulls":[null, 1.1],       "arrayVariant":null,              
"arrayObject":[{"x": 1},{"x":2}]}

Review Comment:
   What's the expectation of how arrays of arrays are handled?



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -280,4 +320,122 @@ private <T> boolean allNull(Indexed<T> dimValues)
     }
     return true;
   }
+
+  public static class ArrayDictionaryMergingIterator implements Iterator<int[]>
+  {
+    private static final Comparator<PeekingIterator<int[]>> 
PEEKING_ITERATOR_COMPARATOR =
+        (lhs, rhs) -> 
FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR.compare(lhs.peek(), 
rhs.peek());
+
+    protected final PriorityQueue<PeekingIterator<int[]>> pQueue;
+    protected int counter;
+
+    public ArrayDictionaryMergingIterator(Iterable<Object[]>[] 
dimValueLookups, GlobalDictionaryIdLookup idLookup)
+    {
+      pQueue = new PriorityQueue<>(PEEKING_ITERATOR_COMPARATOR);
+
+      for (Iterable<Object[]> dimValueLookup : dimValueLookups) {
+        if (dimValueLookup == null) {
+          continue;
+        }
+        final PeekingIterator<int[]> iter = Iterators.peekingIterator(
+            new IdLookupArrayIterator(idLookup, dimValueLookup.iterator())
+        );
+        if (iter.hasNext()) {
+          pQueue.add(iter);
+        }
+      }
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return !pQueue.isEmpty();
+    }
+
+    @Override
+    public int[] next()
+    {
+      PeekingIterator<int[]> smallest = pQueue.remove();
+      if (smallest == null) {
+        throw new NoSuchElementException();
+      }
+      final int[] value = smallest.next();
+      if (smallest.hasNext()) {
+        pQueue.add(smallest);
+      }
+
+      while (!pQueue.isEmpty() && Arrays.equals(value, pQueue.peek().peek())) {
+        PeekingIterator<int[]> same = pQueue.remove();
+        same.next();
+        if (same.hasNext()) {
+          pQueue.add(same);
+        }
+      }
+      counter++;
+
+      return value;
+    }
+
+    public int getCardinality()
+    {
+      return counter;
+    }
+
+    @Override
+    public void remove()
+    {
+      throw new UnsupportedOperationException("remove");
+    }
+  }
+
+  public static class IdLookupArrayIterator implements Iterator<int[]>
+  {
+    private final GlobalDictionaryIdLookup idLookup;
+    private final Iterator<Object[]> delegate;
+
+    public IdLookupArrayIterator(
+        GlobalDictionaryIdLookup idLookup,
+        Iterator<Object[]> delegate
+    )
+    {
+      this.idLookup = idLookup;
+      this.delegate = delegate;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return delegate.hasNext();
+    }
+
+    @Override
+    public int[] next()
+    {
+      final Object[] next = delegate.next();
+      if (next == null) {
+        return null;
+      }
+      final int[] newIdsWhoDis = new int[next.length];
+      for (int i = 0; i < next.length; i++) {
+        if (next[i] == null) {
+          newIdsWhoDis[i] = 0;
+        } else if (next[i] instanceof String) {
+          newIdsWhoDis[i] = idLookup.lookupString((String) next[i]);
+        } else if (next[i] instanceof Long) {
+          newIdsWhoDis[i] = idLookup.lookupLong((Long) next[i]);
+        } else if (next[i] instanceof Double) {
+          newIdsWhoDis[i] = idLookup.lookupDouble((Double) next[i]);
+        } else {
+          newIdsWhoDis[i] = -1;
+        }
+        Preconditions.checkArgument(
+            newIdsWhoDis[i] >= 0,
+            "unknown global id [%s] for value [%s]",
+            newIdsWhoDis[i],
+            next[i]
+        );

Review Comment:
   Given that the global dictionaries, once merged, will be in type-sorted 
order, do we really need to convert back into the actual values instead of just 
converting the dictionary id?



##########
processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java:
##########
@@ -101,7 +101,7 @@ public Sequence<Cursor> makeCursors(
           Cursor retVal = cursor;
           ColumnCapabilities capabilities = 
cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest);
           if (capabilities != null) {
-            if 
(capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue())
 {
+            if (!capabilities.isArray() && 
capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue())
 {

Review Comment:
   This is going to stop the unnest cursor from being able to take advantage of 
dictionary-based processing for columns that are string arrays and could use 
the dictionary based processing.  Why can't it use a dimension selector, the 
column value selectors are, generally speaking, significantly slower for 
processing compared to when the dimension selector can be used, so continuing 
to use a dimension selector where possible should be a goal.



##########
processing/src/main/java/org/apache/druid/segment/NestedDataColumnMerger.java:
##########
@@ -86,93 +94,123 @@ public NestedDataColumnMerger(
   @Override
   public void writeMergedValueDictionary(List<IndexableAdapter> adapters) 
throws IOException
   {
+    try {
+      long dimStartTime = System.currentTimeMillis();
+
+      int numMergeIndex = 0;
+      GlobalDictionarySortedCollector sortedLookup = null;
+      final Indexed[] sortedLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
+      final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+      final Iterable<Object[]>[] sortedArrayLookups = new 
Iterable[adapters.size()];
+
+      final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> 
mergedFields = new TreeMap<>();
+
+      for (int i = 0; i < adapters.size(); i++) {
+        final IndexableAdapter adapter = adapters.get(i);
+        final GlobalDictionarySortedCollector dimValues;
+        if (adapter instanceof IncrementalIndexAdapter) {
+          dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
+        } else if (adapter instanceof QueryableIndexIndexableAdapter) {
+          dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+        } else {
+          throw new ISE("Unable to merge columns of unsupported adapter %s", 
adapter.getClass());
+        }
 
-    long dimStartTime = System.currentTimeMillis();
-
-    int numMergeIndex = 0;
-    GlobalDictionarySortedCollector sortedLookup = null;
-    final Indexed[] sortedLookups = new Indexed[adapters.size()];
-    final Indexed[] sortedLongLookups = new Indexed[adapters.size()];
-    final Indexed[] sortedDoubleLookups = new Indexed[adapters.size()];
+        boolean allNulls = dimValues == null || 
allNull(dimValues.getSortedStrings()) &&
+                                                
allNull(dimValues.getSortedLongs()) &&
+                                                
allNull(dimValues.getSortedDoubles()) &&
+                                                
dimValues.getArrayCardinality() == 0;
+        sortedLookup = dimValues;
+        if (!allNulls) {
+          sortedLookups[i] = dimValues.getSortedStrings();
+          sortedLongLookups[i] = dimValues.getSortedLongs();
+          sortedDoubleLookups[i] = dimValues.getSortedDoubles();
+          sortedArrayLookups[i] = dimValues.getSortedArrays();
+          numMergeIndex++;
+        }
+      }
 
-    final SortedMap<String, NestedLiteralTypeInfo.MutableTypeSet> mergedFields 
= new TreeMap<>();
+      descriptorBuilder = new ColumnDescriptor.Builder();
 
-    for (int i = 0; i < adapters.size(); i++) {
-      final IndexableAdapter adapter = adapters.get(i);
-      final GlobalDictionarySortedCollector dimValues;
-      if (adapter instanceof IncrementalIndexAdapter) {
-        dimValues = 
getSortedIndexFromIncrementalAdapter((IncrementalIndexAdapter) adapter, 
mergedFields);
-      } else if (adapter instanceof QueryableIndexIndexableAdapter) {
-        dimValues = 
getSortedIndexesFromQueryableAdapter((QueryableIndexIndexableAdapter) adapter, 
mergedFields);
+      final NestedDataColumnSerializer defaultSerializer = new 
NestedDataColumnSerializer(
+          name,
+          indexSpec,
+          segmentWriteOutMedium,
+          progressIndicator,
+          closer
+      );
+      serializer = defaultSerializer;
+
+      final ComplexColumnPartSerde partSerde = 
ComplexColumnPartSerde.serializerBuilder()
+                                                                     
.withTypeName(NestedDataComplexTypeSerde.TYPE_NAME)
+                                                                     
.withDelegate(serializer)
+                                                                     .build();
+      descriptorBuilder.setValueType(ValueType.COMPLEX)
+                       .setHasMultipleValues(false)
+                       .addSerde(partSerde);
+
+      defaultSerializer.open();
+      defaultSerializer.serializeFields(mergedFields);
+
+      int stringCardinality;
+      int longCardinality;
+      int doubleCardinality;
+      int arrayCardinality;
+      if (numMergeIndex == 1) {
+        
defaultSerializer.serializeStringDictionary(sortedLookup.getSortedStrings());
+        
defaultSerializer.serializeLongDictionary(sortedLookup.getSortedLongs());
+        
defaultSerializer.serializeDoubleDictionary(sortedLookup.getSortedDoubles());
+        defaultSerializer.serializeArrayDictionary(() -> new 
ArrayDictionaryMergingIterator(
+            sortedArrayLookups,
+            defaultSerializer.getGlobalLookup()
+        ));

Review Comment:
   Why can't this one just be `sortedLookup.getSortedArrays()` like the other 3?



##########
processing/src/test/java/org/apache/druid/segment/IndexBuilder.java:
##########
@@ -267,4 +395,44 @@ private static IncrementalIndex 
buildIncrementalIndexWithRows(
     }
     return incrementalIndex;
   }
+
+  private static IncrementalIndex buildIncrementalIndexWithInputSource(
+      IncrementalIndexSchema schema,
+      InputSource inputSource,
+      InputFormat inputFormat,
+      @Nullable TransformSpec transformSpec,
+      File inputSourceTmpDir,
+      int maxRows
+  )
+  {
+    Preconditions.checkNotNull(schema, "schema");
+    Preconditions.checkNotNull(inputSource, "inputSource");
+    Preconditions.checkNotNull(inputFormat, "inputFormat");
+    Preconditions.checkNotNull(inputSourceTmpDir, "inputSourceTmpDir");
+
+    final IncrementalIndex incrementalIndex = new 
OnheapIncrementalIndex.Builder()
+        .setIndexSchema(schema)
+        .setMaxRowCount(maxRows)
+        .build();
+    TransformSpec tranformer = transformSpec != null ? transformSpec : 
TransformSpec.NONE;

Review Comment:
   spell-check



##########
processing/src/main/java/org/apache/druid/segment/nested/ArrayOfLiteralsFieldColumnWriter.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.nested;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
+
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+
+public class ArrayOfLiteralsFieldColumnWriter extends 
GlobalDictionaryEncodedFieldColumnWriter<int[]>
+{
+
+  protected ArrayOfLiteralsFieldColumnWriter(
+      String columnName,
+      String fieldName,
+      SegmentWriteOutMedium segmentWriteOutMedium,
+      IndexSpec indexSpec,
+      GlobalDictionaryIdLookup globalDictionaryIdLookup
+  )
+  {
+    super(columnName, fieldName, segmentWriteOutMedium, indexSpec, 
globalDictionaryIdLookup);
+  }
+
+  @Override
+  int[] processValue(int row, Object value)
+  {
+    if (value instanceof Object[]) {
+      Object[] array = (Object[]) value;
+      final int[] newIdsWhoDis = new int[array.length];
+      for (int i = 0; i < array.length; i++) {
+        if (array[i] == null) {
+          newIdsWhoDis[i] = 0;
+        } else if (array[i] instanceof String) {
+          newIdsWhoDis[i] = globalDictionaryIdLookup.lookupString((String) 
array[i]);
+        } else if (array[i] instanceof Long) {
+          newIdsWhoDis[i] = globalDictionaryIdLookup.lookupLong((Long) 
array[i]);
+        } else if (array[i] instanceof Double) {
+          newIdsWhoDis[i] = globalDictionaryIdLookup.lookupDouble((Double) 
array[i]);
+        } else {
+          newIdsWhoDis[i] = -1;
+        }
+        Preconditions.checkArgument(newIdsWhoDis[i] >= 0, "unknown global id 
[%s] for value [%s]", newIdsWhoDis[i], array[i]);
+        arrayElements.computeIfAbsent(
+            newIdsWhoDis[i],
+            (id) -> 
indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()
+        ).add(row);
+      }
+      return newIdsWhoDis;
+    }
+    return null;

Review Comment:
   Is `null` the correct thing here?  Is there any way to structure the calling 
code such that the correct behavior when it got something that wasn't the 
expected type is to throw an exception?



##########
processing/src/test/java/org/apache/druid/segment/data/FrontCodedIntArrayIndexedTest.java:
##########
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.data;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMedium;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+
+@RunWith(Parameterized.class)
+public class FrontCodedIntArrayIndexedTest
+{
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(new Object[]{ByteOrder.LITTLE_ENDIAN}, new 
Object[]{ByteOrder.BIG_ENDIAN});
+  }
+
+  private final ByteOrder order;
+
+  public FrontCodedIntArrayIndexedTest(ByteOrder byteOrder)
+  {
+    this.order = byteOrder;
+  }
+
+  @Test
+  public void testFrontCodedIntArrayIndexed() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    TreeSet<int[]> values = new 
TreeSet<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
+    values.add(new int[]{1, 2, 3});
+    values.add(new int[]{1, 2});
+    values.add(new int[]{1, 3});
+    values.add(new int[]{1, 2, 4});
+    values.add(new int[]{1, 3, 4});
+    values.add(new int[]{1, 2, 1});
+
+    fillBuffer(buffer, values, 4);
+
+    buffer.position(0);
+    FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+
+    Iterator<int[]> indexedIterator = codedIndexed.iterator();
+    Iterator<int[]> expectedIterator = values.iterator();
+    int ctr = 0;
+    while (expectedIterator.hasNext() && indexedIterator.hasNext()) {
+      final int[] expectedNext = expectedIterator.next();
+      final int[] next = indexedIterator.next();
+      Assert.assertArrayEquals(expectedNext, next);
+      Assert.assertEquals(ctr, codedIndexed.indexOf(next));
+      ctr++;
+    }
+    Assert.assertEquals(expectedIterator.hasNext(), indexedIterator.hasNext());
+  }
+
+
+  @Test
+  public void testFrontCodedIntArrayIndexedSingleBucket() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    TreeSet<int[]> values = new 
TreeSet<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
+    values.add(new int[]{1, 2, 3});
+    values.add(new int[]{1, 2});
+    values.add(new int[]{1, 3});
+    values.add(new int[]{1, 2, 4});
+    values.add(new int[]{1, 3, 4});
+    values.add(new int[]{1, 2, 1});
+    fillBuffer(buffer, values, 16);
+
+    FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+
+    Iterator<int[]> expectedIterator = values.iterator();
+    Iterator<int[]> indexedIterator = codedIndexed.iterator();
+    int ctr = 0;
+    while (indexedIterator.hasNext() && expectedIterator.hasNext()) {
+      final int[] expectedNext = expectedIterator.next();
+      final int[] next = indexedIterator.next();
+      Assert.assertArrayEquals(expectedNext, next);
+      Assert.assertEquals(ctr, codedIndexed.indexOf(next));
+      ctr++;
+    }
+    Assert.assertEquals(expectedIterator.hasNext(), indexedIterator.hasNext());
+  }
+
+  @Test
+  public void testFrontCodedIntArrayIndexedBigger() throws IOException
+  {
+    final int sizeBase = 10000;
+    final int bucketSize = 16;
+    final ByteBuffer buffer = ByteBuffer.allocate(1 << 24).order(order);
+    for (int sizeAdjust = 0; sizeAdjust < bucketSize; sizeAdjust++) {
+      final TreeSet<int[]> values = new 
TreeSet<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
+      while (values.size() < sizeBase + sizeAdjust) {
+        int length = ThreadLocalRandom.current().nextInt(10);
+        final int[] val = new int[length];
+        for (int j = 0; j < length; j++) {
+          val[j] = ThreadLocalRandom.current().nextInt(0, 10_000);
+        }
+        values.add(val);
+      }
+      fillBuffer(buffer, values, bucketSize);
+
+      FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+          buffer,
+          buffer.order()
+      ).get();
+
+      Iterator<int[]> expectedIterator = values.iterator();
+      Iterator<int[]> indexedIterator = codedIndexed.iterator();
+      int ctr = 0;
+      while (indexedIterator.hasNext() && expectedIterator.hasNext()) {
+        final int[] expectedNext = expectedIterator.next();
+        final int[] next = indexedIterator.next();
+        Assert.assertArrayEquals(expectedNext, next);
+        Assert.assertEquals(ctr, codedIndexed.indexOf(next));
+        ctr++;
+      }
+      Assert.assertEquals(expectedIterator.hasNext(), 
indexedIterator.hasNext());
+      Assert.assertEquals(ctr, sizeBase + sizeAdjust);
+    }
+  }
+
+  @Test
+  public void testFrontCodedIntArrayIndexedBiggerWithNulls() throws IOException
+  {
+    final int sizeBase = 10000;
+    final int bucketSize = 16;
+    final ByteBuffer buffer = ByteBuffer.allocate(1 << 25).order(order);
+    for (int sizeAdjust = 0; sizeAdjust < bucketSize; sizeAdjust++) {
+      TreeSet<int[]> values = new 
TreeSet<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
+      values.add(null);
+      while (values.size() < sizeBase + sizeAdjust + 1) {
+        int length = ThreadLocalRandom.current().nextInt(10);
+        final int[] val = new int[length];
+        for (int j = 0; j < length; j++) {
+          val[j] = ThreadLocalRandom.current().nextInt(0, 10_000);
+        }
+        values.add(val);
+      }
+      fillBuffer(buffer, values, bucketSize);
+
+      FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+          buffer,
+          buffer.order()
+      ).get();
+
+      Iterator<int[]> expectedIterator = values.iterator();
+      Iterator<int[]> indexedIterator = codedIndexed.iterator();
+      int ctr = 0;
+      while (indexedIterator.hasNext() && expectedIterator.hasNext()) {
+        final int[] expectedNext = expectedIterator.next();
+        final int[] next = indexedIterator.next();
+        Assert.assertArrayEquals(expectedNext, next);
+        Assert.assertEquals(ctr, codedIndexed.indexOf(next));
+        ctr++;
+      }
+      Assert.assertEquals(expectedIterator.hasNext(), 
indexedIterator.hasNext());
+      Assert.assertEquals(ctr, sizeBase + sizeAdjust + 1);
+    }
+  }
+
+  @Test
+  public void testFrontCodedIntArrayIndexedIndexOf() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    TreeSet<int[]> values = new 
TreeSet<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
+    values.add(new int[]{1, 2});
+    values.add(new int[]{1, 2, 1});
+    values.add(new int[]{1, 2, 3});
+    values.add(new int[]{1, 2, 4});
+    values.add(new int[]{1, 3});
+    values.add(new int[]{1, 3, 4});
+
+    fillBuffer(buffer, values, 4);
+
+    FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+    Assert.assertEquals(-1, codedIndexed.indexOf(new int[]{1}));
+    Assert.assertEquals(0, codedIndexed.indexOf(new int[]{1, 2}));
+    Assert.assertEquals(1, codedIndexed.indexOf(new int[]{1, 2, 1}));
+    Assert.assertEquals(-3, codedIndexed.indexOf(new int[]{1, 2, 2}));
+    Assert.assertEquals(4, codedIndexed.indexOf(new int[]{1, 3}));
+    Assert.assertEquals(-7, codedIndexed.indexOf(new int[]{1, 4, 4}));
+    Assert.assertEquals(-7, codedIndexed.indexOf(new int[]{9, 1, 1}));
+  }
+
+
+  @Test
+  public void testFrontCodedIntArrayIndexedIndexOfWithNull() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    TreeSet<int[]> values = new 
TreeSet<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
+    values.add(null);
+    values.add(new int[]{1, 2});
+    values.add(new int[]{1, 2, 1});
+    values.add(new int[]{1, 2, 3});
+    values.add(new int[]{1, 2, 4});
+    values.add(new int[]{1, 3});
+    values.add(new int[]{1, 3, 4});
+    fillBuffer(buffer, values, 4);
+
+    FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+    Assert.assertEquals(0, codedIndexed.indexOf(null));
+    Assert.assertEquals(-2, codedIndexed.indexOf(new int[]{1}));
+    Assert.assertEquals(1, codedIndexed.indexOf(new int[]{1, 2}));
+    Assert.assertEquals(2, codedIndexed.indexOf(new int[]{1, 2, 1}));
+    Assert.assertEquals(-4, codedIndexed.indexOf(new int[]{1, 2, 2}));
+    Assert.assertEquals(5, codedIndexed.indexOf(new int[]{1, 3}));
+    Assert.assertEquals(-8, codedIndexed.indexOf(new int[]{1, 4, 4}));
+    Assert.assertEquals(-8, codedIndexed.indexOf(new int[]{9, 1, 1}));
+  }
+
+
+  @Test
+  public void testFrontCodedOnlyNull() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 12).order(order);
+    List<int[]> theList = Collections.singletonList(null);
+    fillBuffer(buffer, theList, 4);
+
+    buffer.position(0);
+    FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+
+    Assert.assertNull(codedIndexed.get(0));
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
codedIndexed.get(-1));
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
codedIndexed.get(theList.size()));
+
+    Assert.assertEquals(0, codedIndexed.indexOf(null));
+    Assert.assertEquals(-2, codedIndexed.indexOf(new int[]{1, 2, 3, 4}));
+
+    Iterator<int[]> iterator = codedIndexed.iterator();
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertNull(iterator.next());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testFrontCodedEmpty() throws IOException
+  {
+    ByteBuffer buffer = ByteBuffer.allocate(1 << 6).order(order);
+    List<int[]> theList = Collections.emptyList();
+    fillBuffer(buffer, theList, 4);
+
+    buffer.position(0);
+    FrontCodedIndexed codedUtf8Indexed = FrontCodedIndexed.read(
+        buffer,
+        buffer.order()
+    ).get();
+
+    Assert.assertEquals(0, codedUtf8Indexed.size());
+    Throwable t = Assert.assertThrows(IAE.class, () -> 
codedUtf8Indexed.get(0));
+    Assert.assertEquals("Index[0] >= size[0]", t.getMessage());
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
codedUtf8Indexed.get(-1));
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
codedUtf8Indexed.get(theList.size()));
+
+    Assert.assertEquals(-1, codedUtf8Indexed.indexOf(null));
+    Assert.assertEquals(-1, 
codedUtf8Indexed.indexOf(StringUtils.toUtf8ByteBuffer("hello")));
+
+    Iterator<ByteBuffer> utf8Iterator = codedUtf8Indexed.iterator();
+    Assert.assertFalse(utf8Iterator.hasNext());
+  }
+
+  @Test
+  public void testBucketSizes() throws IOException
+  {
+    final int numValues = 10000;
+    final ByteBuffer buffer = ByteBuffer.allocate(1 << 25).order(order);
+    final int[] bucketSizes = new int[]{
+        1,
+        1 << 1,
+        1 << 2,
+        1 << 3,
+        1 << 4,
+        1 << 5,
+        1 << 6,
+        1 << 7
+    };
+
+    TreeSet<int[]> values = new 
TreeSet<>(FrontCodedIntArrayIndexedWriter.ARRAY_COMPARATOR);
+    values.add(null);
+    while (values.size() < numValues + 1) {
+      int length = ThreadLocalRandom.current().nextInt(10);
+      final int[] val = new int[length];
+      for (int j = 0; j < length; j++) {
+        val[j] = ThreadLocalRandom.current().nextInt(0, 10_000);
+      }
+      values.add(val);
+    }
+    for (int bucketSize : bucketSizes) {
+      fillBuffer(buffer, values, bucketSize);
+      FrontCodedIntArrayIndexed codedIndexed = FrontCodedIntArrayIndexed.read(
+          buffer,
+          buffer.order()
+      ).get();
+
+      Iterator<int[]> expectedIterator = values.iterator();
+      Iterator<int[]> iterator = codedIndexed.iterator();
+      int ctr = 0;
+      while (iterator.hasNext() && expectedIterator.hasNext()) {
+        final int[] expectedNext = expectedIterator.next();
+        final int[] next = iterator.next();
+        Assert.assertArrayEquals(expectedNext, next);
+
+        Assert.assertEquals(ctr, codedIndexed.indexOf(next));
+        ctr++;
+      }
+      Assert.assertEquals(expectedIterator.hasNext(), iterator.hasNext());
+      Assert.assertEquals(ctr, numValues + 1);
+    }
+  }
+
+  @Test
+  public void testBadBucketSize()
+  {
+    OnHeapMemorySegmentWriteOutMedium medium = new 
OnHeapMemorySegmentWriteOutMedium();
+
+    Assert.assertThrows(
+        IAE.class,
+        () -> new FrontCodedIntArrayIndexedWriter(
+            medium,
+            ByteOrder.nativeOrder(),
+            0
+        )
+    );
+
+    Assert.assertThrows(
+        IAE.class,
+        () -> new FrontCodedIntArrayIndexedWriter(
+            medium,
+            ByteOrder.nativeOrder(),
+            15
+        )
+    );
+
+    Assert.assertThrows(
+        IAE.class,
+        () -> new FrontCodedIntArrayIndexedWriter(
+            medium,
+            ByteOrder.nativeOrder(),
+            256
+        )
+    );
+  }
+
+  private static long fillBuffer(ByteBuffer buffer, Iterable<int[]> 
sortedIterable, int bucketSize) throws IOException

Review Comment:
   naming nit: how about `persistToBuffer`.  `fillBuffer` initially made me 
think that this was just filling the buffer directly, not that it was actually 
using the "normal" persist logic with the writer.



##########
processing/src/test/java/org/apache/druid/segment/IndexBuilder.java:
##########
@@ -267,4 +395,44 @@ private static IncrementalIndex 
buildIncrementalIndexWithRows(
     }
     return incrementalIndex;
   }
+
+  private static IncrementalIndex buildIncrementalIndexWithInputSource(
+      IncrementalIndexSchema schema,
+      InputSource inputSource,
+      InputFormat inputFormat,
+      @Nullable TransformSpec transformSpec,
+      File inputSourceTmpDir,
+      int maxRows
+  )
+  {
+    Preconditions.checkNotNull(schema, "schema");
+    Preconditions.checkNotNull(inputSource, "inputSource");
+    Preconditions.checkNotNull(inputFormat, "inputFormat");
+    Preconditions.checkNotNull(inputSourceTmpDir, "inputSourceTmpDir");
+
+    final IncrementalIndex incrementalIndex = new 
OnheapIncrementalIndex.Builder()
+        .setIndexSchema(schema)
+        .setMaxRowCount(maxRows)
+        .build();
+    TransformSpec tranformer = transformSpec != null ? transformSpec : 
TransformSpec.NONE;
+    InputRowSchema rowSchema = new InputRowSchema(schema.getTimestampSpec(), 
schema.getDimensionsSpec(), null);
+    InputSourceReader reader = inputSource.reader(rowSchema, inputFormat, 
inputSourceTmpDir);
+    InputSourceReader transformingReader = tranformer.decorate(reader);
+    try (CloseableIterator<InputRow> rowIterator = transformingReader.read()) {
+      while (rowIterator.hasNext()) {
+        incrementalIndex.add(rowIterator.next());

Review Comment:
   When `maxRows` is hit, are we expecting an exception?  Generally speaking, 
setting the `maxRows` on the tests is done as a way to force running queries 
against multiple segments, so I had expected to see a check for the numRows and 
incremental persists in anything that takes `maxRows`.



##########
sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java:
##########
@@ -705,6 +705,23 @@ public void testQuery(
         .run();
   }
 
+  public void testQuery(
+      final String sql,
+      final Map<String, Object> queryContext,
+      final List<Query<?>> expectedQueries,
+      final List<Object[]> expectedResults,
+      final RowSignature expectedResultSignature
+  )
+  {
+    testBuilder()
+        .sql(sql)
+        .queryContext(queryContext)
+        .expectedQueries(expectedQueries)
+        .expectedResults(expectedResults)
+        .expectedSignature(expectedResultSignature)
+        .run();
+  }

Review Comment:
   Part of me wonders if, instead of adding more functions with extra arguments 
to their signatures, we should switch the call-sites to using the builder 
directly?



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -327,17 +390,58 @@ public void close()
   }
 
   @Override
-  public DimensionSelector makeDimensionSelector(List<NestedPathPart> path, 
ReadableOffset readableOffset, ExtractionFn fn)
+  public DimensionSelector makeDimensionSelector(
+      List<NestedPathPart> path,
+      ReadableOffset readableOffset,
+      ExtractionFn fn
+  )
   {
     final String field = getField(path);
     Preconditions.checkNotNull(field, "Null field");
 
     if (fields.indexOf(field) >= 0) {
       DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(field).getColumn();
       return col.makeDimensionSelector(readableOffset, fn);
-    } else {
-      return DimensionSelector.constant(null);
     }
+    if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+      final NestedPathPart lastPath = path.get(path.size() - 1);

Review Comment:
   Might as well cast and assign to a `NestedPathArrayElement` here.



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -327,17 +390,58 @@ public void close()
   }
 
   @Override
-  public DimensionSelector makeDimensionSelector(List<NestedPathPart> path, 
ReadableOffset readableOffset, ExtractionFn fn)
+  public DimensionSelector makeDimensionSelector(
+      List<NestedPathPart> path,
+      ReadableOffset readableOffset,
+      ExtractionFn fn
+  )
   {
     final String field = getField(path);
     Preconditions.checkNotNull(field, "Null field");
 
     if (fields.indexOf(field) >= 0) {
       DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(field).getColumn();
       return col.makeDimensionSelector(readableOffset, fn);
-    } else {
-      return DimensionSelector.constant(null);
     }
+    if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+      final NestedPathPart lastPath = path.get(path.size() - 1);
+      final String arrayField = getField(path.subList(0, path.size() - 1));
+      if (fields.indexOf(arrayField) >= 0) {

Review Comment:
   Looking over the code, we are being very gratuitous with calls to 
`fields.indexOf()`.  My read is that in the case that this matches and we 
actually load the column, we will have binary searched for the exact same value 
a minimum of 3 times.  Once here, then twice in `readNestedFieldColumn`.  
Please refactor all of the various methods to take the actual `int` id of what 
they want to read and push the code towards a singular `indexOf()` where we 
push around the actual `int fieldId` after that.



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -327,17 +390,58 @@ public void close()
   }
 
   @Override
-  public DimensionSelector makeDimensionSelector(List<NestedPathPart> path, 
ReadableOffset readableOffset, ExtractionFn fn)
+  public DimensionSelector makeDimensionSelector(
+      List<NestedPathPart> path,
+      ReadableOffset readableOffset,
+      ExtractionFn fn
+  )
   {
     final String field = getField(path);
     Preconditions.checkNotNull(field, "Null field");
 
     if (fields.indexOf(field) >= 0) {
       DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(field).getColumn();
       return col.makeDimensionSelector(readableOffset, fn);
-    } else {
-      return DimensionSelector.constant(null);
     }
+    if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+      final NestedPathPart lastPath = path.get(path.size() - 1);
+      final String arrayField = getField(path.subList(0, path.size() - 1));
+      if (fields.indexOf(arrayField) >= 0) {
+        final int elementNumber = ((NestedPathArrayElement) 
lastPath).getIndex();
+        if (elementNumber < 0) {
+          throw new IAE("Cannot make array element selector for path [%s], 
negative array index not supported for this selector", path);
+        }
+        DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(arrayField).getColumn();
+        ColumnValueSelector<?> arraySelector = 
col.makeColumnValueSelector(readableOffset);
+        return new BaseSingleValueDimensionSelector()
+        {
+          @Nullable
+          @Override
+          protected String getValue()
+          {
+            Object o = arraySelector.getObject();
+            if (o instanceof Object[]) {
+              Object[] array = (Object[]) o;
+              if (elementNumber < array.length) {
+                Object element = array[elementNumber];
+                if (element == null) {
+                  return null;
+                }
+                return String.valueOf(element);
+              }
+            }
+            return null;
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            arraySelector.inspectRuntimeShape(inspector);
+          }
+        };

Review Comment:
   We are being asked for a DimensionSelector here, why convert it to a 
ColumnValueSelector and pay the cost of grabbing objects when we could very 
easily just return a DimensionSelector?  If we are being asked for a 
`DimensionSelector` let's return a proper `DimensionSelector`. I understand 
that the interface is very String-specific right now, but let's not hobble this 
implementation just because we want to make a change to that interface at some 
point in the future.



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -349,9 +453,76 @@ public ColumnValueSelector<?> 
makeColumnValueSelector(List<NestedPathPart> path,
     if (fields.indexOf(field) >= 0) {
       BaseColumn col = getColumnHolder(field).getColumn();
       return col.makeColumnValueSelector(readableOffset);
-    } else {
-      return NilColumnValueSelector.instance();
     }
+    if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+      final NestedPathPart lastPath = path.get(path.size() - 1);
+      final String arrayField = getField(path.subList(0, path.size() - 1));
+      if (fields.indexOf(arrayField) >= 0) {
+        final int elementNumber = ((NestedPathArrayElement) 
lastPath).getIndex();
+        if (elementNumber < 0) {
+          throw new IAE("Cannot make array element selector for path [%s], 
negative array index not supported for this selector", path);
+        }
+        DictionaryEncodedColumn<?> col = (DictionaryEncodedColumn<?>) 
getColumnHolder(arrayField).getColumn();
+        ColumnValueSelector arraySelector = 
col.makeColumnValueSelector(readableOffset);

Review Comment:
   Various commentary in the `makeDimensionSelector` implementation applies 
here too.



##########
processing/src/main/java/org/apache/druid/segment/nested/GlobalDimensionDictionary.java:
##########
@@ -42,45 +47,107 @@
   private final ComparatorDimensionDictionary<String> stringDictionary;
   private final ComparatorDimensionDictionary<Long> longDictionary;
   private final ComparatorDimensionDictionary<Double> doubleDictionary;
+  private final Set<Object[]> stringArrays;
+  private final Set<Object[]> longArrays;
+  private final Set<Object[]> doubleArrays;

Review Comment:
   Why this separation by type?  Isn't is just an `array` and the ids are any 
other scalar thing in the global dictionary?



##########
processing/src/main/java/org/apache/druid/segment/nested/GlobalDictionaryEncodedFieldColumnWriter.java:
##########
@@ -231,9 +252,16 @@ public void writeTo(int finalRowCount, FileSmoosher 
smoosher) throws IOException
       @Override
       public long getSerializedSize() throws IOException
       {
+        final long arraySize;
+        if (arrayElements.size() > 0) {
+          arraySize = arrayElementDictionaryWriter.getSerializedSize() + 
arrayElementIndexWriter.getSerializedSize();
+        } else {
+          arraySize = 0;
+        }

Review Comment:
   Random thought when reading this code: should we keep track of the maximum 
length of array that we see?  It could be used to quickly filter out things 
that are accessing elementIds that are larger than anything we store.



##########
processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java:
##########
@@ -215,14 +263,24 @@ public void serializeFields(SortedMap<String, 
NestedLiteralTypeInfo.MutableTypeS
               indexSpec,
               globalDictionaryIdLookup
           );
-        } else {
+        } else if (Types.is(type, ValueType.DOUBLE)) {
           writer = new DoubleFieldColumnWriter(
               name,
               fieldFileName,
               segmentWriteOutMedium,
               indexSpec,
               globalDictionaryIdLookup
           );
+        } else if (Types.is(type, ValueType.ARRAY)) {
+          writer = new ArrayOfLiteralsFieldColumnWriter(
+              name,
+              fieldFileName,
+              segmentWriteOutMedium,
+              indexSpec,
+              globalDictionaryIdLookup
+          );
+        } else {
+          throw new IllegalArgumentException("wtf");

Review Comment:
   How about make it a `UOE` and include the type that caused us to get here :)



##########
processing/src/test/java/org/apache/druid/segment/IndexBuilder.java:
##########
@@ -157,27 +241,71 @@ public QueryableIndex buildMMappedIndex()
 
   public QueryableIndex buildMMappedMergedIndex()
   {
-    IndexMerger indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
     Preconditions.checkNotNull(tmpDir, "tmpDir");
-
     final List<QueryableIndex> persisted = new ArrayList<>();
-    try {
-      for (int i = 0; i < rows.size(); i += ROWS_PER_INDEX_FOR_MERGING) {
+    if (inputSource != null) {
+      Preconditions.checkNotNull(inputSource, "inputSource");
+      Preconditions.checkNotNull(inputFormat, "inputFormat");
+      Preconditions.checkNotNull(inputSourceTmpDir, "inputSourceTmpDir");
+
+      TransformSpec tranformer = transformSpec != null ? transformSpec : 
TransformSpec.NONE;

Review Comment:
   spell-check: `transformer`



##########
processing/src/main/java/org/apache/druid/segment/nested/NestedDataColumnSerializer.java:
##########
@@ -251,33 +313,78 @@ public void serializeStringDictionary(Iterable<String> 
dictionaryValues) throws
       dictionaryWriter.write(value);
       globalDictionaryIdLookup.addString(value);
     }
+    stringDictionarySerialized = true;
   }
 
   public void serializeLongDictionary(Iterable<Long> dictionaryValues) throws 
IOException
   {
+    if (!stringDictionarySerialized) {
+      throw new ISE("Must serialize string value dictionary before serializing 
long dictionary for column [%s]", name);
+    }
+    if (longDictionarySerialized) {
+      throw new ISE("Long dictionary already serialized for column [%s], 
cannot serialize again", name);
+    }
     for (Long value : dictionaryValues) {
       if (value == null) {
         continue;
       }
       longDictionaryWriter.write(value);
       globalDictionaryIdLookup.addLong(value);
     }
+    longDictionarySerialized = true;
   }
 
   public void serializeDoubleDictionary(Iterable<Double> dictionaryValues) 
throws IOException
   {
+    if (!stringDictionarySerialized) {
+      throw new ISE("Must serialize string value dictionary before serializing 
double dictionary for column [%s]", name);
+    }
+    if (!longDictionarySerialized) {
+      throw new ISE("Must serialize long value dictionary before serializing 
double dictionary for column [%s]", name);
+    }
+    if (doubleDictionarySerialized) {
+      throw new ISE("Double dictionary already serialized for column [%s], 
cannot serialize again", name);
+    }
     for (Double value : dictionaryValues) {
       if (value == null) {
         continue;
       }
       doubleDictionaryWriter.write(value);
       globalDictionaryIdLookup.addDouble(value);
     }
+    doubleDictionarySerialized = true;
+  }
+
+  public void serializeArrayDictionary(Iterable<int[]> dictionaryValues) 
throws IOException
+  {
+    if (!stringDictionarySerialized) {
+      throw new ISE("Must serialize string value dictionary before serializing 
array dictionary for column [%s]", name);
+    }
+    if (!longDictionarySerialized) {
+      throw new ISE("Must serialize long value dictionary before serializing 
array dictionary for column [%s]", name);
+    }
+    if (!doubleDictionarySerialized) {
+      throw new ISE("Must serialize double value dictionary before serializing 
array dictionary for column [%s]", name);
+    }
+    if (arrayDictionarySerialized) {
+      throw new ISE("Array dictionary already serialized for column [%s], 
cannot serialize again", name);
+    }

Review Comment:
   It seems weird to me to have a bunch of public methods that throw a bunch of 
exceptions if they aren't called in the right order.  Why do each of them need 
to be exposed as public individually if they can only be called once in a 
specific order?  Why not have a single `serializeDictionaries` method that is 
called publicly which does the things in the correct order?



##########
sql/src/main/java/org/apache/druid/sql/calcite/expression/builtin/ArrayContainsOperatorConversion.java:
##########
@@ -95,7 +95,7 @@ public DimFilter toDruidFilter(
     final DruidExpression leftExpr = druidExpressions.get(0);
     final DruidExpression rightExpr = druidExpressions.get(1);
 
-    if (leftExpr.isSimpleExtraction()) {
+    if (leftExpr.isSimpleExtraction() && !(leftExpr.getDruidType() != null && 
leftExpr.getDruidType().isArray())) {

Review Comment:
   What if it's an `array_contains()` over just a normal single-valued `String` 
column?  Shouldn't that also match the filter, pretending that each row 
contains an array of size 1?



##########
processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java:
##########
@@ -509,50 +507,6 @@ public void createIndex(
     }
   }
 
-  public void createIndex(
-      InputStream inputDataStream,
-      String parserJson,
-      String transformSpecJson,
-      String aggregators,
-      File outDir,
-      long minTimestamp,
-      Granularity gran,
-      int maxRowCount,
-      boolean rollup
-  ) throws Exception
-  {
-    try {
-      StringInputRowParser parser = mapper.readValue(parserJson, 
StringInputRowParser.class);
-      TransformSpec transformSpec;
-      if (transformSpecJson != null) {
-        transformSpec = mapper.readValue(transformSpecJson, 
TransformSpec.class);
-        parser = new TransformingStringInputRowParser(parser.getParseSpec(), 
parser.getEncoding(), transformSpec);
-      }
-
-      LineIterator iter = IOUtils.lineIterator(inputDataStream, "UTF-8");
-      List<AggregatorFactory> aggregatorSpecs = mapper.readValue(
-          aggregators,
-          new TypeReference<List<AggregatorFactory>>()
-          {
-          }
-      );
-
-      createIndex(
-          iter,
-          parser,
-          aggregatorSpecs.toArray(new AggregatorFactory[0]),
-          outDir,
-          minTimestamp,
-          gran,
-          true,
-          maxRowCount,
-          rollup
-      );
-    }
-    finally {
-      Closeables.close(inputDataStream, true);
-    }
-  }

Review Comment:
   Removing this method could quite easily break extensions.  IIRC, the 
`AggregationTestHelper` exists primarily to aid in building tests for 
AggregatorFactory extensions.  Did the same logic move somewhere else? If so, 
can we keep this method (maybe deprecated?) and have it call that other place?



##########
processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java:
##########
@@ -426,6 +765,12 @@ public ColumnIndexSupplier 
getColumnIndexSupplier(List<NestedPathPart> path)
   {
     final String field = getField(path);
     if (fields.indexOf(field) < 0) {
+      if (!path.isEmpty() && path.get(path.size() - 1) instanceof 
NestedPathArrayElement) {
+        final String arrayField = getField(path.subList(0, path.size() - 1));
+        if (fields.indexOf(arrayField) >= 0) {
+          return NoIndexesColumnIndexSupplier.getInstance();
+        }

Review Comment:
   I'm assuming this is no indexes because we cannot force it to also have a 
value matcher along with using a bitmap index?  If so, it's sad that we cannot 
use indexes in this case.  We have indexes that are extremely useful here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to