imply-cheddar commented on code in PR #13268:
URL: https://github.com/apache/druid/pull/13268#discussion_r1022115086


##########
processing/src/main/java/org/apache/druid/query/UnnestDataSource.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.UnnestSegmentReference;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+public class UnnestDataSource implements DataSource
+{
+  private final DataSource base;
+  private final String column;
+  private final String outputName;
+  private final LinkedHashSet<String> allowList;
+
+
+  private UnnestDataSource(
+      DataSource dataSource,
+      String columnName,
+      String outputName,
+      LinkedHashSet<String> allowList
+  )
+  {
+    this.base = dataSource;
+    this.column = columnName;
+    this.outputName = outputName;
+    this.allowList = allowList;
+  }
+
+  /**
+   * Create an unnest dataSource from a string condition.

Review Comment:
   What is this comment trying to tell me?



##########
processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java:
##########
@@ -125,6 +126,11 @@ public static DataSourceAnalysis forDataSource(final 
DataSource dataSource)
       current = subQuery.getDataSource();
     }
 
+    while (current instanceof UnnestDataSource) {

Review Comment:
   If there is a Query of an Unnest of a Query of an Unnest, the way that you 
have interleaved these is not going to completely unwrap the objects as 
expected.
   
   This DataSourceAnalysis thing is probably another thing to move onto the 
DataSource object itself...  Not sure if we should do that now or leave it as 
something to do for later though.  either way, you need both conditions (check 
for Query and check for Unnest) on the while loop above.



##########
processing/src/main/java/org/apache/druid/query/UnnestDataSource.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.UnnestSegmentReference;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+public class UnnestDataSource implements DataSource
+{
+  private final DataSource base;
+  private final String column;
+  private final String outputName;
+  private final LinkedHashSet<String> allowList;
+
+
+  private UnnestDataSource(
+      DataSource dataSource,
+      String columnName,
+      String outputName,
+      LinkedHashSet<String> allowList
+  )
+  {
+    this.base = dataSource;
+    this.column = columnName;
+    this.outputName = outputName;
+    this.allowList = allowList;
+  }
+
+  /**
+   * Create an unnest dataSource from a string condition.
+   */
+  @JsonCreator
+  public static UnnestDataSource create(
+      @JsonProperty("base") DataSource base,
+      @JsonProperty("column") String columnName,
+      @JsonProperty("outputName") String outputName,
+      @Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList
+  )
+  {
+    return new UnnestDataSource(base, columnName, outputName, allowList);
+  }
+
+  @JsonProperty("base")
+  public DataSource getBase()
+  {
+    return base;
+  }
+
+  @JsonProperty("column")
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty("outputName")
+  public String getOutputName()
+  {
+    return outputName;
+  }
+
+  @JsonProperty("allowList")
+  public LinkedHashSet<String> getAllowList()
+  {
+    return allowList;
+  }
+
+  @Override
+  public Set<String> getTableNames()
+  {
+    return base.getTableNames();
+  }
+
+  @Override
+  public List<DataSource> getChildren()
+  {
+    return ImmutableList.of(base);
+  }
+
+  @Override
+  public DataSource withChildren(List<DataSource> children)
+  {
+    if (children.size() != 1) {
+      throw new IAE("Expected [1] child, got [%d]", children.size());
+    }
+    return new UnnestDataSource(children.get(0), column, outputName, 
allowList);
+  }
+
+  @Override
+  public boolean isCacheable(boolean isBroker)
+  {
+    return base.isCacheable(isBroker);
+  }
+
+  @Override
+  public boolean isGlobal()
+  {
+    return base.isGlobal();
+  }
+
+  @Override
+  public boolean isConcrete()
+  {
+    return base.isConcrete();
+  }
+
+  @Override
+  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+      Query query,
+      AtomicLong cpuTimeAccumulator
+  )
+  {
+    return JvmUtils.safeAccumulateThreadCpuTime(
+        cpuTimeAccumulator,
+        () -> {
+          if (column == null) {
+            return Function.identity();
+          } else if (column.isEmpty()) {
+            return Function.identity();
+          } else {
+            return baseSegment ->
+                new UnnestSegmentReference(
+                    baseSegment,
+                    column,
+                    outputName,
+                    allowList
+                );
+          }
+        }
+    );

Review Comment:
   This code doesn't seem to be delegating to its child, do you have any tests 
that test for, e.g. nesting of these things?



##########
processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class ColumnarValueUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public ColumnarValueUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumSelectorFactory = cursor.getColumnSelectorFactory();
+    this.columnValueSelector = 
this.baseColumSelectorFactory.makeColumnValueSelector(columnName);
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec);
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        if (!outputName.equals(columnName)) {
+          return baseColumSelectorFactory.makeColumnValueSelector(columnName);
+        }
+        return new ColumnValueSelector()
+        {
+          @Override
+          public double getDouble()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Double.valueOf((String) value);
+          }
+
+          @Override
+          public float getFloat()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Float.valueOf((String) value);
+          }
+
+          @Override
+          public long getLong()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Long.valueOf((String) value);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            
baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector);
+          }
+
+          @Override
+          public boolean isNull()
+          {
+            return getObject() == null;
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (!unnestListForCurrentRow.isEmpty()) {
+              if (allowSet == null || allowSet.isEmpty()) {
+                return unnestListForCurrentRow.get(index);
+              } else if (allowSet.contains((String) 
unnestListForCurrentRow.get(index))) {
+                return unnestListForCurrentRow.get(index);
+              }
+            }
+            return null;
+          }
+
+          @Override
+          public Class classOfObject()
+          {
+            return Object.class;
+          }
+        };
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        if (!outputName.equals(columnName)) {
+          baseColumSelectorFactory.getColumnCapabilities(column);
+        }
+        return baseColumSelectorFactory.getColumnCapabilities(columnName);
+      }
+    };
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return baseCursor.getTime();
+  }
+
+  @Override
+  public void advance()
+  {
+    advanceUninterruptibly();
+    BaseQuery.checkInterrupted();
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    do {
+      advanceAndUpdate();
+    } while (matchAndProceed());
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    if (needInitialization && baseCursor.isDone() == false) {
+      initialize();
+    }
+    return baseCursor.isDone();
+  }
+
+  @Override
+  public boolean isDoneOrInterrupted()
+  {
+    if (needInitialization && baseCursor.isDoneOrInterrupted() == false) {
+      initialize();
+    }
+    return baseCursor.isDoneOrInterrupted();
+  }
+
+  @Override
+  public void reset()
+  {
+    index = 0;
+    needInitialization = true;
+    baseCursor.reset();
+  }
+
+  @Override
+  public void initialize()
+  {
+    if (columnValueSelector != null) {

Review Comment:
   Why would columnValueSelector ever be null?



##########
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java:
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+public class DimensionUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final DimensionSelector dimSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private final BitSet allowedBitSet;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private int index;
+  private IndexedInts indexedIntsForCurrentRow;
+  private boolean needInitialization;
+
+  public DimensionUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )

Review Comment:
   Here too, this object should depend on the dimensionSelector and expect that 
the thing that decided to call it already has a good dimension selector for it.



##########
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java:
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+public class DimensionUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final DimensionSelector dimSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private final BitSet allowedBitSet;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private int index;
+  private IndexedInts indexedIntsForCurrentRow;
+  private boolean needInitialization;
+
+  public DimensionUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory();
+    this.dimSelector = 
this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+    allowedBitSet = new BitSet();
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        if (!outputName.equals(dimensionSpec.getDimension())) {
+          return 
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+        }
+
+        final DimensionSpec actualDimensionSpec = 
dimensionSpec.withDimension(columnName);
+        return new DimensionSelector()
+        {
+          @Override
+          public IndexedInts getRow()
+          {
+            return dimSelector.getRow();

Review Comment:
   I expected to see flattening here, but I'm not.  Are you sure that this code 
is working as expected?  There's a chance that the tests aren't actually 
validating this as, without code here, I don't believe that this will actually 
unnest the array.



##########
processing/src/main/java/org/apache/druid/segment/UnnestCursor.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+/**
+ * UnnestCursor is an abstraction over cursors for unnest.
+ * There are two types of cursors :
+ * -- DimensionUnnestCursor which is called if the column is disctionary 
encoded
+ * -- ColumnarValueCursor otherwise
+ */
+public interface UnnestCursor extends Cursor

Review Comment:
   I don't believe this interface adds anything.  You don't store references to 
objects of this interface, the interface appears to just be trying to create 
definitions for the internal implementation details of the cursors you are 
creating.  Those classes can already access their own internal private methods, 
so this interface isn't actually needed, just delete it.  If you want to keep 
the javadoc, move it to the methods on the concrete classes.



##########
processing/src/test/java/org/apache/druid/segment/ListCursor.java:
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ListCursor implements Cursor
+{
+  public static int LIMIT = 2;
+  List<Object> baseList;
+  private int index;
+
+  public ListCursor()
+  {
+    this.index = 0;
+    populateList();
+  }
+
+  public ListCursor(List<Object> inputList)
+  {
+    this.baseList = inputList;
+  }
+
+  void populateList()

Review Comment:
   I think what you want is a static method that builds a ListCursor rather 
than a method that can get called at any point in time to mutate and changes 
the internals of the `ListCursor` object.



##########
processing/src/test/java/org/apache/druid/segment/UnnestStorageAdapterTest.java:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.apache.druid.utils.CloseableUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class UnnestStorageAdapterTest extends InitializedNullHandlingTest
+{
+  private static Closer CLOSER;
+  private static IncrementalIndex INCREMENTAL_INDEX;
+  private static IncrementalIndexStorageAdapter 
INCREMENTAL_INDEX_STORAGE_ADAPTER;
+  private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER;
+  private static UnnestStorageAdapter UNNEST_STORAGE_ADAPTER1;
+  private static List<StorageAdapter> ADAPTERS;
+  private static String COLUMNNAME = "multi-string1";
+  private static String OUTPUT_COLUMN_NAME = "unnested-multi-string1";
+  private static LinkedHashSet<String> IGNORE_SET = new 
LinkedHashSet<>(Arrays.asList("1", "5", "100"));
+
+  @BeforeClass
+  public static void setup()
+  {
+    CLOSER = Closer.create();
+    final GeneratorSchemaInfo schemaInfo = 
GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench");
+
+    final DataSegment dataSegment = DataSegment.builder()
+                                               .dataSource("foo")
+                                               
.interval(schemaInfo.getDataInterval())
+                                               .version("1")
+                                               .shardSpec(new 
LinearShardSpec(0))
+                                               .size(0)
+                                               .build();
+    final SegmentGenerator segmentGenerator = CLOSER.register(new 
SegmentGenerator());
+
+    final int numRows = 10_000;
+    INCREMENTAL_INDEX = CLOSER.register(
+        segmentGenerator.generateIncrementalIndex(dataSegment, schemaInfo, 
Granularities.HOUR, numRows)
+    );
+    INCREMENTAL_INDEX_STORAGE_ADAPTER = new 
IncrementalIndexStorageAdapter(INCREMENTAL_INDEX);
+    UNNEST_STORAGE_ADAPTER = new UnnestStorageAdapter(
+        INCREMENTAL_INDEX_STORAGE_ADAPTER,
+        COLUMNNAME,
+        OUTPUT_COLUMN_NAME,
+        null
+    );
+    UNNEST_STORAGE_ADAPTER1 = new UnnestStorageAdapter(
+        INCREMENTAL_INDEX_STORAGE_ADAPTER,
+        COLUMNNAME,
+        OUTPUT_COLUMN_NAME,
+        IGNORE_SET
+    );
+    ADAPTERS = ImmutableList.of(UNNEST_STORAGE_ADAPTER);
+  }
+
+  @AfterClass
+  public static void teardown()
+  {
+    CloseableUtils.closeAndSuppressExceptions(CLOSER, throwable -> {
+    });
+  }
+
+  @Test
+  public void test_unnest_adapter_methods()
+  {
+    String colName = "multi-string1";
+    for (StorageAdapter adapter : ADAPTERS) {
+      Assert.assertEquals(
+          DateTimes.of("2000-01-01T23:00:00.000Z"),
+          adapter.getMaxTime()
+      );
+      Assert.assertEquals(
+          DateTimes.of("2000-01-01T00:00:00.000Z"),
+          adapter.getMinTime()
+      );
+      adapter.getColumnCapabilities(colName);
+      Assert.assertEquals(adapter.getNumRows(), 0);
+      Assert.assertNotNull(adapter.getMetadata());
+      Assert.assertEquals(
+          DateTimes.of("2000-01-01T23:59:59.999Z"),
+          adapter.getMaxIngestedEventTime()
+      );
+      Assert.assertEquals(
+          adapter.getColumnCapabilities(colName).toColumnType(),
+          
INCREMENTAL_INDEX_STORAGE_ADAPTER.getColumnCapabilities(colName).toColumnType()
+      );
+      Assert.assertEquals(UNNEST_STORAGE_ADAPTER.getDimensionToUnnest(), 
colName);
+    }
+  }
+
+  @Test
+  public void test_unnest_adapters_basic()
+  {
+    final String columnName = "multi-string1";
+
+    for (StorageAdapter adapter : ADAPTERS) {
+      Sequence<Cursor> cursorSequence = adapter.makeCursors(
+          null,
+          adapter.getInterval(),
+          VirtualColumns.EMPTY,
+          Granularities.ALL,
+          false,
+          null
+      );
+
+
+      cursorSequence.accumulate(null, (accumulated, cursor) -> {
+        ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+        DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+        ColumnValueSelector valueSelector = 
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME);
+
+
+        while (!cursor.isDone()) {
+          Object dimSelectorVal = dimSelector.getObject();
+          Object valueSelectorVal = valueSelector.getObject();
+          if (dimSelectorVal == null) {
+            Assert.assertNull(dimSelectorVal);
+          } else if (valueSelectorVal == null) {
+            Assert.assertNull(valueSelectorVal);
+          }
+          cursor.advance();
+        }
+        return null;
+      });
+    }
+  }
+
+  @Test
+  public void test_unnest_adapters_allowList()
+  {
+    final String columnName = "multi-string1";
+
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER1.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+      ColumnValueSelector valueSelector = 
factory.makeColumnValueSelector(OUTPUT_COLUMN_NAME);
+
+
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        Object valueSelectorVal = valueSelector.getObject();
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        } else if (valueSelectorVal == null) {
+          Assert.assertNull(valueSelectorVal);
+        }
+        cursor.advance();
+      }
+      return null;
+    });
+  }
+
+  @Test
+  public void test_dim_unnest_adapters()
+  {
+    final String columnName = "multi-string1";
+
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER1.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+    UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1;
+    Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
+    Assert.assertEquals(
+        
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
+        ColumnCapabilities.Capable.TRUE
+    );
+    Assert.assertEquals(adapter.getMaxValue(columnName), 
adapter.getMaxValue(OUTPUT_COLUMN_NAME));
+    Assert.assertEquals(adapter.getMinValue(columnName), 
adapter.getMinValue(OUTPUT_COLUMN_NAME));
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+
+
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        }
+        cursor.advance();
+      }
+      return null;
+    });
+  }
+
+  @Test
+  public void test_dim_unnest_adapters_methods()
+  {
+    final String columnName = "multi-string1";
+
+    Sequence<Cursor> cursorSequence = UNNEST_STORAGE_ADAPTER1.makeCursors(
+        null,
+        UNNEST_STORAGE_ADAPTER1.getInterval(),
+        VirtualColumns.EMPTY,
+        Granularities.ALL,
+        false,
+        null
+    );
+    UnnestStorageAdapter adapter = UNNEST_STORAGE_ADAPTER1;
+    Assert.assertEquals(adapter.getDimensionToUnnest(), columnName);
+    Assert.assertEquals(
+        
adapter.getColumnCapabilities(OUTPUT_COLUMN_NAME).isDictionaryEncoded(),
+        ColumnCapabilities.Capable.TRUE
+    );
+    Assert.assertEquals(adapter.getMaxValue(columnName), 
adapter.getMaxValue(OUTPUT_COLUMN_NAME));
+    Assert.assertEquals(adapter.getMinValue(columnName), 
adapter.getMinValue(OUTPUT_COLUMN_NAME));
+
+    cursorSequence.accumulate(null, (accumulated, cursor) -> {
+      ColumnSelectorFactory factory = cursor.getColumnSelectorFactory();
+
+      DimensionSelector dimSelector = 
factory.makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+
+
+      while (!cursor.isDone()) {
+        Object dimSelectorVal = dimSelector.getObject();
+        Assert.assertNotNull(dimSelector.getRow());
+        Assert.assertNotNull(dimSelector.getValueCardinality());
+        Assert.assertNotNull(dimSelector.makeValueMatcher(OUTPUT_COLUMN_NAME));
+        Assert.assertNotNull(dimSelector.idLookup());
+        Assert.assertNotNull(dimSelector.lookupName(0));
+        Assert.assertNotNull(dimSelector.defaultGetObject());
+        Assert.assertFalse(dimSelector.isNull());
+        if (dimSelectorVal == null) {
+          Assert.assertNull(dimSelectorVal);
+        }

Review Comment:
   The assertions here should be updated.  You should be able to know and 
validate the sepcific ValueCardinality and this seems to always be looking up 
the value for the 0 index, which shouldn't be correct.  If the test isn't 
actually walking through rows with different dictionary values, it's not really 
validating what we need it to.



##########
processing/src/main/java/org/apache/druid/query/UnnestDataSource.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.UnnestSegmentReference;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+public class UnnestDataSource implements DataSource
+{
+  private final DataSource base;
+  private final String column;
+  private final String outputName;
+  private final LinkedHashSet<String> allowList;
+
+
+  private UnnestDataSource(
+      DataSource dataSource,
+      String columnName,
+      String outputName,
+      LinkedHashSet<String> allowList
+  )
+  {
+    this.base = dataSource;
+    this.column = columnName;
+    this.outputName = outputName;
+    this.allowList = allowList;
+  }
+
+  /**
+   * Create an unnest dataSource from a string condition.
+   */
+  @JsonCreator
+  public static UnnestDataSource create(
+      @JsonProperty("base") DataSource base,
+      @JsonProperty("column") String columnName,
+      @JsonProperty("outputName") String outputName,
+      @Nullable @JsonProperty("allowList") LinkedHashSet<String> allowList
+  )
+  {
+    return new UnnestDataSource(base, columnName, outputName, allowList);
+  }
+
+  @JsonProperty("base")
+  public DataSource getBase()
+  {
+    return base;
+  }
+
+  @JsonProperty("column")
+  public String getColumn()
+  {
+    return column;
+  }
+
+  @JsonProperty("outputName")
+  public String getOutputName()
+  {
+    return outputName;
+  }
+
+  @JsonProperty("allowList")
+  public LinkedHashSet<String> getAllowList()
+  {
+    return allowList;
+  }
+
+  @Override
+  public Set<String> getTableNames()
+  {
+    return base.getTableNames();
+  }
+
+  @Override
+  public List<DataSource> getChildren()
+  {
+    return ImmutableList.of(base);
+  }
+
+  @Override
+  public DataSource withChildren(List<DataSource> children)
+  {
+    if (children.size() != 1) {
+      throw new IAE("Expected [1] child, got [%d]", children.size());
+    }
+    return new UnnestDataSource(children.get(0), column, outputName, 
allowList);
+  }
+
+  @Override
+  public boolean isCacheable(boolean isBroker)
+  {
+    return base.isCacheable(isBroker);
+  }
+
+  @Override
+  public boolean isGlobal()
+  {
+    return base.isGlobal();
+  }
+
+  @Override
+  public boolean isConcrete()
+  {
+    return base.isConcrete();
+  }
+
+  @Override
+  public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
+      Query query,
+      AtomicLong cpuTimeAccumulator
+  )
+  {
+    return JvmUtils.safeAccumulateThreadCpuTime(
+        cpuTimeAccumulator,
+        () -> {
+          if (column == null) {
+            return Function.identity();
+          } else if (column.isEmpty()) {
+            return Function.identity();
+          } else {
+            return baseSegment ->
+                new UnnestSegmentReference(
+                    baseSegment,
+                    column,
+                    outputName,
+                    allowList
+                );
+          }
+        }
+    );
+
+  }
+
+  @Override
+  public DataSource withUpdatedDataSource(DataSource newSource)
+  {
+    return null;

Review Comment:
   Is this never called?  If it is, my guess is that it will produce an NPE.  
Maybe include a comment about why it is safe to do this?



##########
processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class ColumnarValueUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public ColumnarValueUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )

Review Comment:
   It should be safe to pass in the baseColumnSelectorFactory directly.  Once 
you've made the decision to use this object, you should already have a good 
column selector factory to use.



##########
processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class ColumnarValueUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public ColumnarValueUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumSelectorFactory = cursor.getColumnSelectorFactory();
+    this.columnValueSelector = 
this.baseColumSelectorFactory.makeColumnValueSelector(columnName);
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec);
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        if (!outputName.equals(columnName)) {
+          return baseColumSelectorFactory.makeColumnValueSelector(columnName);
+        }
+        return new ColumnValueSelector()
+        {
+          @Override
+          public double getDouble()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Double.valueOf((String) value);
+          }
+
+          @Override
+          public float getFloat()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Float.valueOf((String) value);
+          }
+
+          @Override
+          public long getLong()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Long.valueOf((String) value);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            
baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector);
+          }
+
+          @Override
+          public boolean isNull()
+          {
+            return getObject() == null;
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (!unnestListForCurrentRow.isEmpty()) {
+              if (allowSet == null || allowSet.isEmpty()) {
+                return unnestListForCurrentRow.get(index);
+              } else if (allowSet.contains((String) 
unnestListForCurrentRow.get(index))) {
+                return unnestListForCurrentRow.get(index);
+              }
+            }
+            return null;
+          }
+
+          @Override
+          public Class classOfObject()
+          {
+            return Object.class;
+          }
+        };
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        if (!outputName.equals(columnName)) {
+          baseColumSelectorFactory.getColumnCapabilities(column);
+        }
+        return baseColumSelectorFactory.getColumnCapabilities(columnName);
+      }
+    };
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return baseCursor.getTime();
+  }
+
+  @Override
+  public void advance()
+  {
+    advanceUninterruptibly();
+    BaseQuery.checkInterrupted();
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    do {
+      advanceAndUpdate();
+    } while (matchAndProceed());
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    if (needInitialization && baseCursor.isDone() == false) {
+      initialize();
+    }
+    return baseCursor.isDone();
+  }
+
+  @Override
+  public boolean isDoneOrInterrupted()
+  {
+    if (needInitialization && baseCursor.isDoneOrInterrupted() == false) {

Review Comment:
   `==` on a boolean thing is a bit weird?  Just 
`!baseCursor.isDoneOrInterrupted()`?



##########
processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class ColumnarValueUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public ColumnarValueUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumSelectorFactory = cursor.getColumnSelectorFactory();
+    this.columnValueSelector = 
this.baseColumSelectorFactory.makeColumnValueSelector(columnName);
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec);

Review Comment:
   You are not intercepting the calls for the dimension from here, you will 
need to intercept them.



##########
processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class ColumnarValueUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public ColumnarValueUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumSelectorFactory = cursor.getColumnSelectorFactory();
+    this.columnValueSelector = 
this.baseColumSelectorFactory.makeColumnValueSelector(columnName);
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec);
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        if (!outputName.equals(columnName)) {
+          return baseColumSelectorFactory.makeColumnValueSelector(columnName);
+        }
+        return new ColumnValueSelector()
+        {
+          @Override
+          public double getDouble()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Double.valueOf((String) value);
+          }
+
+          @Override
+          public float getFloat()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Float.valueOf((String) value);
+          }
+
+          @Override
+          public long getLong()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Long.valueOf((String) value);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            
baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector);

Review Comment:
   Why can't you reuse the object from the constructor?



##########
processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class ColumnarValueUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public ColumnarValueUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumSelectorFactory = cursor.getColumnSelectorFactory();
+    this.columnValueSelector = 
this.baseColumSelectorFactory.makeColumnValueSelector(columnName);
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec);
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        if (!outputName.equals(columnName)) {
+          return baseColumSelectorFactory.makeColumnValueSelector(columnName);
+        }
+        return new ColumnValueSelector()
+        {
+          @Override
+          public double getDouble()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Double.valueOf((String) value);
+          }
+
+          @Override
+          public float getFloat()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Float.valueOf((String) value);
+          }
+
+          @Override
+          public long getLong()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Long.valueOf((String) value);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            
baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector);
+          }
+
+          @Override
+          public boolean isNull()
+          {
+            return getObject() == null;
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (!unnestListForCurrentRow.isEmpty()) {
+              if (allowSet == null || allowSet.isEmpty()) {
+                return unnestListForCurrentRow.get(index);
+              } else if (allowSet.contains((String) 
unnestListForCurrentRow.get(index))) {
+                return unnestListForCurrentRow.get(index);
+              }
+            }
+            return null;
+          }
+
+          @Override
+          public Class classOfObject()
+          {
+            return Object.class;
+          }
+        };
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        if (!outputName.equals(columnName)) {
+          baseColumSelectorFactory.getColumnCapabilities(column);
+        }
+        return baseColumSelectorFactory.getColumnCapabilities(columnName);
+      }
+    };
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return baseCursor.getTime();
+  }
+
+  @Override
+  public void advance()
+  {
+    advanceUninterruptibly();
+    BaseQuery.checkInterrupted();
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    do {
+      advanceAndUpdate();
+    } while (matchAndProceed());
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    if (needInitialization && baseCursor.isDone() == false) {
+      initialize();
+    }
+    return baseCursor.isDone();
+  }
+
+  @Override
+  public boolean isDoneOrInterrupted()
+  {
+    if (needInitialization && baseCursor.isDoneOrInterrupted() == false) {
+      initialize();
+    }
+    return baseCursor.isDoneOrInterrupted();
+  }
+
+  @Override
+  public void reset()
+  {
+    index = 0;
+    needInitialization = true;
+    baseCursor.reset();
+  }
+
+  @Override
+  public void initialize()
+  {
+    if (columnValueSelector != null) {
+      this.currentVal = this.columnValueSelector.getObject();
+      this.unnestListForCurrentRow = new ArrayList<>();
+      if (currentVal == null) {
+        unnestListForCurrentRow = new ArrayList<>();
+        unnestListForCurrentRow.add(null);
+      } else {
+        if (currentVal instanceof List) {
+          unnestListForCurrentRow = (List<Object>) currentVal;
+        } else if (currentVal.getClass().equals(String.class)) {
+          unnestListForCurrentRow.add(currentVal);

Review Comment:
   Adding more and more things to the list is going to effectively mean that 
you are keeping a reference to all of the values in memory as you go through 
the processing.  This is effectively a memory leak and I'm not sure it's 
necessary?
   
   While it might seem ugly, I think you would be better off with a reference 
for a single Object and a reference for a List<> and then actually having logic 
that decides which one it is "iterating" over.



##########
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java:
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+public class DimensionUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final DimensionSelector dimSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private final BitSet allowedBitSet;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private int index;
+  private IndexedInts indexedIntsForCurrentRow;
+  private boolean needInitialization;
+
+  public DimensionUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory();
+    this.dimSelector = 
this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+    allowedBitSet = new BitSet();
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        if (!outputName.equals(dimensionSpec.getDimension())) {
+          return 
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+        }
+
+        final DimensionSpec actualDimensionSpec = 
dimensionSpec.withDimension(columnName);
+        return new DimensionSelector()
+        {
+          @Override
+          public IndexedInts getRow()
+          {
+            return dimSelector.getRow();
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(@Nullable String value)
+          {
+            return new ValueMatcher()
+            {
+              @Override
+              public boolean matches()
+              {
+                return 
lookupName(indexedIntsForCurrentRow.get(index)).equals(value);
+              }
+
+              @Override
+              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+              {
+                
baseColumnSelectorFactory.makeDimensionSelector(actualDimensionSpec).inspectRuntimeShape(inspector);
+              }
+            };
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+          {
+            return DimensionSelectorUtils.makeValueMatcherGeneric(this, 
predicate);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            baseCursor.getColumnSelectorFactory()
+                      .makeDimensionSelector(actualDimensionSpec)
+                      .inspectRuntimeShape(inspector);
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (indexedIntsForCurrentRow != null) {
+              if (allowedBitSet.isEmpty()) {

Review Comment:
   I don't see anywhere that `allowedBitSet` is getting mutated, why would it 
ever be non-empty?



##########
processing/src/main/java/org/apache/druid/segment/ColumnarValueUnnestCursor.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+
+public class ColumnarValueUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final ColumnSelectorFactory baseColumSelectorFactory;
+  private final ColumnValueSelector columnValueSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private int index;
+  private Object currentVal;
+  private List<Object> unnestListForCurrentRow;
+  private boolean needInitialization;
+
+  public ColumnarValueUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumSelectorFactory = cursor.getColumnSelectorFactory();
+    this.columnValueSelector = 
this.baseColumSelectorFactory.makeColumnValueSelector(columnName);
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        return baseColumSelectorFactory.makeDimensionSelector(dimensionSpec);
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String columnName)
+      {
+        if (!outputName.equals(columnName)) {
+          return baseColumSelectorFactory.makeColumnValueSelector(columnName);
+        }
+        return new ColumnValueSelector()
+        {
+          @Override
+          public double getDouble()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Double.valueOf((String) value);
+          }
+
+          @Override
+          public float getFloat()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Float.valueOf((String) value);
+          }
+
+          @Override
+          public long getLong()
+          {
+            Object value = getObject();
+            if (value == null) {
+              return 0;
+            }
+            return Long.valueOf((String) value);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            
baseColumSelectorFactory.makeColumnValueSelector(columnName).inspectRuntimeShape(inspector);
+          }
+
+          @Override
+          public boolean isNull()
+          {
+            return getObject() == null;
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (!unnestListForCurrentRow.isEmpty()) {
+              if (allowSet == null || allowSet.isEmpty()) {
+                return unnestListForCurrentRow.get(index);
+              } else if (allowSet.contains((String) 
unnestListForCurrentRow.get(index))) {
+                return unnestListForCurrentRow.get(index);
+              }
+            }
+            return null;
+          }
+
+          @Override
+          public Class classOfObject()
+          {
+            return Object.class;
+          }
+        };
+      }
+
+      @Nullable
+      @Override
+      public ColumnCapabilities getColumnCapabilities(String column)
+      {
+        if (!outputName.equals(columnName)) {
+          baseColumSelectorFactory.getColumnCapabilities(column);
+        }
+        return baseColumSelectorFactory.getColumnCapabilities(columnName);
+      }
+    };
+  }
+
+  @Override
+  public DateTime getTime()
+  {
+    return baseCursor.getTime();
+  }
+
+  @Override
+  public void advance()
+  {
+    advanceUninterruptibly();
+    BaseQuery.checkInterrupted();
+  }
+
+  @Override
+  public void advanceUninterruptibly()
+  {
+    do {
+      advanceAndUpdate();
+    } while (matchAndProceed());
+  }
+
+  @Override
+  public boolean isDone()
+  {
+    if (needInitialization && baseCursor.isDone() == false) {
+      initialize();
+    }
+    return baseCursor.isDone();
+  }
+
+  @Override
+  public boolean isDoneOrInterrupted()
+  {
+    if (needInitialization && baseCursor.isDoneOrInterrupted() == false) {
+      initialize();
+    }
+    return baseCursor.isDoneOrInterrupted();
+  }
+
+  @Override
+  public void reset()
+  {
+    index = 0;
+    needInitialization = true;
+    baseCursor.reset();
+  }
+
+  @Override
+  public void initialize()
+  {
+    if (columnValueSelector != null) {
+      this.currentVal = this.columnValueSelector.getObject();
+      this.unnestListForCurrentRow = new ArrayList<>();
+      if (currentVal == null) {
+        unnestListForCurrentRow = new ArrayList<>();
+        unnestListForCurrentRow.add(null);
+      } else {
+        if (currentVal instanceof List) {
+          unnestListForCurrentRow = (List<Object>) currentVal;
+        } else if (currentVal.getClass().equals(String.class)) {
+          unnestListForCurrentRow.add(currentVal);
+        }
+      }
+      if (allowSet != null) {
+        if (!allowSet.isEmpty()) {
+          if (!allowSet.contains((String) unnestListForCurrentRow.get(index))) 
{
+            advance();
+          }
+        }
+      }
+    }
+    needInitialization = false;
+  }
+
+  @Override
+  public void advanceAndUpdate()

Review Comment:
   The logic of `initialize` and the logic of `advanceAndUpdate` seem very, 
very similar to me.  Is there a reason that the initialization cannot just be a 
pre-emptive `advanceAndUpdate`?



##########
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java:
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+public class DimensionUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final DimensionSelector dimSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private final BitSet allowedBitSet;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private int index;
+  private IndexedInts indexedIntsForCurrentRow;
+  private boolean needInitialization;
+
+  public DimensionUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory();
+    this.dimSelector = 
this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+    allowedBitSet = new BitSet();
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        if (!outputName.equals(dimensionSpec.getDimension())) {
+          return 
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+        }
+
+        final DimensionSpec actualDimensionSpec = 
dimensionSpec.withDimension(columnName);
+        return new DimensionSelector()
+        {
+          @Override
+          public IndexedInts getRow()
+          {
+            return dimSelector.getRow();
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(@Nullable String value)
+          {
+            return new ValueMatcher()
+            {
+              @Override
+              public boolean matches()
+              {
+                return 
lookupName(indexedIntsForCurrentRow.get(index)).equals(value);
+              }
+
+              @Override
+              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+              {
+                
baseColumnSelectorFactory.makeDimensionSelector(actualDimensionSpec).inspectRuntimeShape(inspector);

Review Comment:
   Is there a reason we cannot reuse the object from the constructor?



##########
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java:
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+public class DimensionUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final DimensionSelector dimSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private final BitSet allowedBitSet;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private int index;
+  private IndexedInts indexedIntsForCurrentRow;
+  private boolean needInitialization;
+
+  public DimensionUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory();
+    this.dimSelector = 
this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+    allowedBitSet = new BitSet();
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        if (!outputName.equals(dimensionSpec.getDimension())) {
+          return 
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+        }
+
+        final DimensionSpec actualDimensionSpec = 
dimensionSpec.withDimension(columnName);
+        return new DimensionSelector()
+        {
+          @Override
+          public IndexedInts getRow()
+          {
+            return dimSelector.getRow();
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(@Nullable String value)
+          {
+            return new ValueMatcher()
+            {
+              @Override
+              public boolean matches()
+              {
+                return 
lookupName(indexedIntsForCurrentRow.get(index)).equals(value);

Review Comment:
   This is going to lookup and build a String for every row that is processed.  
It completely negates the benefits of having a dictionary and actually makes 
the dictionary processed stuff slower.  The dictionary has already converted 
teh string into an integer, you can lookup the dictionary id and then compare 
against that without ever building a String, do that instead.



##########
processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.filter.AndFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+
+public class UnnestStorageAdapter implements StorageAdapter
+{
+  private final StorageAdapter baseAdapter;
+  private final String dimensionToUnnest;
+  private final String outputColumnName;
+  private final LinkedHashSet<String> allowSet;
+
+  public UnnestStorageAdapter(
+      final StorageAdapter baseAdapter,
+      final String dimension,
+      final String outputColumnName,
+      final LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseAdapter = baseAdapter;
+    this.dimensionToUnnest = dimension;
+    this.outputColumnName = outputColumnName;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public Sequence<Cursor> makeCursors(
+      @Nullable Filter filter,
+      Interval interval,
+      VirtualColumns virtualColumns,
+      Granularity gran,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    Filter updatedFilter;
+    final InDimFilter allowListFilters;
+    if (allowSet != null && !allowSet.isEmpty()) {
+      allowListFilters = new InDimFilter(dimensionToUnnest, allowSet);
+      if (filter != null) {
+        updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters));
+      } else {
+        updatedFilter = allowListFilters;
+      }
+    } else {
+      updatedFilter = filter;
+    }
+    final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
+        updatedFilter,
+        interval,
+        virtualColumns,
+        gran,
+        descending,
+        queryMetrics
+    );
+
+    return Sequences.map(
+        baseCursorSequence,
+        cursor -> {
+          assert cursor != null;

Review Comment:
   No asserts in code, either you actually want to validate the thing and 
should be checking it or you trust that the code is doing what you expect and 
don't need to check it.  Asserts slow things down and don't actually help in 
production (I know that people will tell you that asserts don't run in 
production code, but profiling disagrees)



##########
processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.filter.AndFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+
+public class UnnestStorageAdapter implements StorageAdapter
+{
+  private final StorageAdapter baseAdapter;
+  private final String dimensionToUnnest;
+  private final String outputColumnName;
+  private final LinkedHashSet<String> allowSet;
+
+  public UnnestStorageAdapter(
+      final StorageAdapter baseAdapter,
+      final String dimension,
+      final String outputColumnName,
+      final LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseAdapter = baseAdapter;
+    this.dimensionToUnnest = dimension;
+    this.outputColumnName = outputColumnName;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public Sequence<Cursor> makeCursors(
+      @Nullable Filter filter,
+      Interval interval,
+      VirtualColumns virtualColumns,
+      Granularity gran,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    Filter updatedFilter;
+    final InDimFilter allowListFilters;
+    if (allowSet != null && !allowSet.isEmpty()) {
+      allowListFilters = new InDimFilter(dimensionToUnnest, allowSet);
+      if (filter != null) {
+        updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters));
+      } else {
+        updatedFilter = allowListFilters;
+      }
+    } else {
+      updatedFilter = filter;
+    }
+    final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
+        updatedFilter,
+        interval,
+        virtualColumns,
+        gran,
+        descending,
+        queryMetrics
+    );
+
+    return Sequences.map(
+        baseCursorSequence,
+        cursor -> {
+          assert cursor != null;
+          Cursor retVal = cursor;
+          UnnestCursor retUnnestCursor;
+          ColumnCapabilities capabilities = 
cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest);
+          if (capabilities.isDictionaryEncoded() == 
ColumnCapabilities.Capable.TRUE
+              && capabilities.areDictionaryValuesUnique() == 
ColumnCapabilities.Capable.TRUE) {
+            retUnnestCursor = new DimensionUnnestCursor(retVal, 
dimensionToUnnest, outputColumnName, allowSet);
+          } else {
+            retUnnestCursor = new ColumnarValueUnnestCursor(retVal, 
dimensionToUnnest, outputColumnName, allowSet);
+          }
+          return retUnnestCursor;
+        }
+    );
+  }
+
+  @Override
+  public Interval getInterval()
+  {
+    return baseAdapter.getInterval();
+  }
+
+  @Override
+  public Indexed<String> getAvailableDimensions()
+  {
+    final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>();
+
+    for (String dim : baseAdapter.getAvailableDimensions()) {
+      availableDimensions.add(dim);
+    }
+    // check to see if output name provided is already
+    // a part of available dimensions
+    if (availableDimensions.contains(outputColumnName)) {
+      throw new IAE(
+          "Provided output name [%s] already exists in table to be unnested. 
Please use a different name.",
+          outputColumnName
+      );
+    } else {
+      availableDimensions.add(outputColumnName);
+    }
+    return new ListIndexed<>(Lists.newArrayList(availableDimensions));
+  }

Review Comment:
   Why is it bad for the output name to already exist?



##########
processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.filter.AndFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+
+public class UnnestStorageAdapter implements StorageAdapter
+{
+  private final StorageAdapter baseAdapter;
+  private final String dimensionToUnnest;
+  private final String outputColumnName;
+  private final LinkedHashSet<String> allowSet;
+
+  public UnnestStorageAdapter(
+      final StorageAdapter baseAdapter,
+      final String dimension,
+      final String outputColumnName,
+      final LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseAdapter = baseAdapter;
+    this.dimensionToUnnest = dimension;
+    this.outputColumnName = outputColumnName;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public Sequence<Cursor> makeCursors(
+      @Nullable Filter filter,
+      Interval interval,
+      VirtualColumns virtualColumns,
+      Granularity gran,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    Filter updatedFilter;
+    final InDimFilter allowListFilters;

Review Comment:
   This doesn't need to be defined outside of the if?



##########
processing/src/main/java/org/apache/druid/segment/DimensionUnnestCursor.java:
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.base.Predicate;
+import org.apache.druid.query.BaseQuery;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.IndexedInts;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.BitSet;
+import java.util.LinkedHashSet;
+
+public class DimensionUnnestCursor implements UnnestCursor
+{
+  private final Cursor baseCursor;
+  private final DimensionSelector dimSelector;
+  private final String columnName;
+  private final String outputName;
+  private final LinkedHashSet<String> allowSet;
+  private final BitSet allowedBitSet;
+  private final ColumnSelectorFactory baseColumnSelectorFactory;
+  private int index;
+  private IndexedInts indexedIntsForCurrentRow;
+  private boolean needInitialization;
+
+  public DimensionUnnestCursor(
+      Cursor cursor,
+      String columnName,
+      String outputColumnName,
+      LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseCursor = cursor;
+    this.baseColumnSelectorFactory = cursor.getColumnSelectorFactory();
+    this.dimSelector = 
this.baseColumnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(columnName));
+    this.columnName = columnName;
+    this.index = 0;
+    this.outputName = outputColumnName;
+    this.needInitialization = true;
+    this.allowSet = allowSet;
+    allowedBitSet = new BitSet();
+  }
+
+  @Override
+  public ColumnSelectorFactory getColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        if (!outputName.equals(dimensionSpec.getDimension())) {
+          return 
baseColumnSelectorFactory.makeDimensionSelector(dimensionSpec);
+        }
+
+        final DimensionSpec actualDimensionSpec = 
dimensionSpec.withDimension(columnName);
+        return new DimensionSelector()
+        {
+          @Override
+          public IndexedInts getRow()
+          {
+            return dimSelector.getRow();
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(@Nullable String value)
+          {
+            return new ValueMatcher()
+            {
+              @Override
+              public boolean matches()
+              {
+                return 
lookupName(indexedIntsForCurrentRow.get(index)).equals(value);
+              }
+
+              @Override
+              public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+              {
+                
baseColumnSelectorFactory.makeDimensionSelector(actualDimensionSpec).inspectRuntimeShape(inspector);
+              }
+            };
+          }
+
+          @Override
+          public ValueMatcher makeValueMatcher(Predicate<String> predicate)
+          {
+            return DimensionSelectorUtils.makeValueMatcherGeneric(this, 
predicate);
+          }
+
+          @Override
+          public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+          {
+            baseCursor.getColumnSelectorFactory()
+                      .makeDimensionSelector(actualDimensionSpec)
+                      .inspectRuntimeShape(inspector);
+          }
+
+          @Nullable
+          @Override
+          public Object getObject()
+          {
+            if (indexedIntsForCurrentRow != null) {
+              if (allowedBitSet.isEmpty()) {
+                if (allowSet == null || allowSet.isEmpty()) {
+                  return lookupName(indexedIntsForCurrentRow.get(index));
+                }
+              } else if 
(allowedBitSet.get(indexedIntsForCurrentRow.get(index))) {
+                return lookupName(indexedIntsForCurrentRow.get(index));
+              }
+            } else {
+              return null;
+            }
+            return null;
+          }
+
+          @Override
+          public Class<?> classOfObject()
+          {
+            return Object.class;
+          }
+
+          @Override
+          public int getValueCardinality()
+          {
+            return 0;

Review Comment:
   This value should be known.



##########
processing/src/test/java/org/apache/druid/query/UnnestQueryRunnerTest.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import org.apache.druid.testing.InitializedNullHandlingTest;
+
+// TODO: Use placementish column in the QueryRunnerHelperTest
+// and set up native queries
+public class UnnestQueryRunnerTest extends InitializedNullHandlingTest

Review Comment:
   Is it intentional that this test is completely empty?



##########
processing/src/main/java/org/apache/druid/segment/UnnestStorageAdapter.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.query.QueryMetrics;
+import org.apache.druid.query.filter.Filter;
+import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.data.Indexed;
+import org.apache.druid.segment.data.ListIndexed;
+import org.apache.druid.segment.filter.AndFilter;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+
+public class UnnestStorageAdapter implements StorageAdapter
+{
+  private final StorageAdapter baseAdapter;
+  private final String dimensionToUnnest;
+  private final String outputColumnName;
+  private final LinkedHashSet<String> allowSet;
+
+  public UnnestStorageAdapter(
+      final StorageAdapter baseAdapter,
+      final String dimension,
+      final String outputColumnName,
+      final LinkedHashSet<String> allowSet
+  )
+  {
+    this.baseAdapter = baseAdapter;
+    this.dimensionToUnnest = dimension;
+    this.outputColumnName = outputColumnName;
+    this.allowSet = allowSet;
+  }
+
+  @Override
+  public Sequence<Cursor> makeCursors(
+      @Nullable Filter filter,
+      Interval interval,
+      VirtualColumns virtualColumns,
+      Granularity gran,
+      boolean descending,
+      @Nullable QueryMetrics<?> queryMetrics
+  )
+  {
+    Filter updatedFilter;
+    final InDimFilter allowListFilters;
+    if (allowSet != null && !allowSet.isEmpty()) {
+      allowListFilters = new InDimFilter(dimensionToUnnest, allowSet);
+      if (filter != null) {
+        updatedFilter = new AndFilter(Arrays.asList(filter, allowListFilters));
+      } else {
+        updatedFilter = allowListFilters;
+      }
+    } else {
+      updatedFilter = filter;
+    }
+    final Sequence<Cursor> baseCursorSequence = baseAdapter.makeCursors(
+        updatedFilter,
+        interval,
+        virtualColumns,
+        gran,
+        descending,
+        queryMetrics
+    );
+
+    return Sequences.map(
+        baseCursorSequence,
+        cursor -> {
+          assert cursor != null;
+          Cursor retVal = cursor;
+          UnnestCursor retUnnestCursor;
+          ColumnCapabilities capabilities = 
cursor.getColumnSelectorFactory().getColumnCapabilities(dimensionToUnnest);
+          if (capabilities.isDictionaryEncoded() == 
ColumnCapabilities.Capable.TRUE
+              && capabilities.areDictionaryValuesUnique() == 
ColumnCapabilities.Capable.TRUE) {
+            retUnnestCursor = new DimensionUnnestCursor(retVal, 
dimensionToUnnest, outputColumnName, allowSet);
+          } else {
+            retUnnestCursor = new ColumnarValueUnnestCursor(retVal, 
dimensionToUnnest, outputColumnName, allowSet);
+          }
+          return retUnnestCursor;
+        }
+    );
+  }
+
+  @Override
+  public Interval getInterval()
+  {
+    return baseAdapter.getInterval();
+  }
+
+  @Override
+  public Indexed<String> getAvailableDimensions()
+  {
+    final LinkedHashSet<String> availableDimensions = new LinkedHashSet<>();
+
+    for (String dim : baseAdapter.getAvailableDimensions()) {
+      availableDimensions.add(dim);
+    }
+    // check to see if output name provided is already
+    // a part of available dimensions
+    if (availableDimensions.contains(outputColumnName)) {
+      throw new IAE(
+          "Provided output name [%s] already exists in table to be unnested. 
Please use a different name.",
+          outputColumnName
+      );
+    } else {
+      availableDimensions.add(outputColumnName);
+    }
+    return new ListIndexed<>(Lists.newArrayList(availableDimensions));
+  }
+
+  @Override
+  public Iterable<String> getAvailableMetrics()
+  {
+    return baseAdapter.getAvailableMetrics();
+  }
+
+  @Override
+  public int getDimensionCardinality(String column)
+  {
+    if (outputColumnName.equals(dimensionToUnnest)) {
+      return baseAdapter.getDimensionCardinality(column);
+    }
+    return baseAdapter.getDimensionCardinality(dimensionToUnnest);
+  }
+
+  @Override
+  public DateTime getMinTime()
+  {
+    return baseAdapter.getMinTime();
+  }
+
+  @Override
+  public DateTime getMaxTime()
+  {
+    return baseAdapter.getMaxTime();
+  }
+
+  @Nullable
+  @Override
+  public Comparable getMinValue(String column)
+  {
+    if (outputColumnName.equals(dimensionToUnnest)) {
+      return baseAdapter.getMinValue(column);
+    }
+    return baseAdapter.getMinValue(dimensionToUnnest);
+  }
+
+  @Nullable
+  @Override
+  public Comparable getMaxValue(String column)
+  {
+    if (outputColumnName.equals(dimensionToUnnest)) {
+      return baseAdapter.getMaxValue(column);
+    }
+    return baseAdapter.getMaxValue(dimensionToUnnest);
+  }
+
+  @Nullable
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String column)
+  {
+    if (outputColumnName.equals(dimensionToUnnest)) {
+      return baseAdapter.getColumnCapabilities(column);
+    }
+    return baseAdapter.getColumnCapabilities(dimensionToUnnest);
+  }
+
+  @Override
+  public int getNumRows()
+  {
+    return 0;

Review Comment:
   I'm unsure if it's safe to return 0 from this...  We should double check 
what uses this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to