This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a2d326ff7aa more lenient array expression casting, fix inconsistencies 
and simplify json_value (#18078)
a2d326ff7aa is described below

commit a2d326ff7aaf5f2acbd6c8ad81a042f06b79c32f
Author: Clint Wylie <[email protected]>
AuthorDate: Fri Jun 6 03:05:44 2025 -0700

    more lenient array expression casting, fix inconsistencies and simplify 
json_value (#18078)
    
    changes:
    * the cast expression will now be more lenient when casting arrays of 
primitive values. Previously single element arrays were allowed to be cast to 
scalar values, and more than 1 element would result in a type cast exception. 
Casting multi-element arrays will now create a null value in the target type 
instead of failing to be more consistent with other primitive value behavior, 
which return null if the cast cannot be completed. Complex value casting will 
still throw type cast exceptions.
    * NestedFieldVirtualColumn has been simplified, removing many of its own 
selector implementations with built in casting in favor of delegating casts to 
expression selectors when casts are required
    * fix several places in NestedFieldVirtualColumn which would allow creating 
selectors that were not consistent with the expected output type of the virtual 
column, leading to inconsistent query results depending on what kind of 
selector was created for the underlying field.
    * NestedFieldVirtualColumn now uses JSON_VALUE and JSON_QUERY expressions 
when no underlying json columns are available to optimize (e.g. when using 
realtime queries), these selectors were very similar
    * Adjusted test data to cover more of the inconsistencies and fixed tests 
which were encoding incorrect behavior
---
 .../druid/msq/exec/MSQComplexGroupByTest.java      |   66 +-
 .../java/org/apache/druid/math/expr/ExprEval.java  |    6 +-
 .../java/org/apache/druid/math/expr/Parser.java    |   16 +
 .../druid/segment/AutoTypeColumnIndexer.java       |    5 +-
 .../apache/druid/segment/AutoTypeColumnMerger.java |    4 +-
 .../apache/druid/segment/column/ColumnType.java    |    9 +
 .../nested/CompressedNestedDataComplexColumn.java  |    9 +-
 .../apache/druid/segment/nested/FieldTypeInfo.java |    5 +-
 .../nested/NestedFieldDictionaryEncodedColumn.java |   18 +-
 .../apache/druid/segment/nested/VariantColumn.java |    5 +-
 .../druid/segment/virtual/ExpressionSelectors.java |  170 ++-
 .../segment/virtual/ExpressionVectorSelectors.java |   21 +-
 .../segment/virtual/NestedFieldVirtualColumn.java  | 1319 ++++++++------------
 .../java/org/apache/druid/math/expr/EvalTest.java  |   73 +-
 .../org/apache/druid/math/expr/ExprEvalTest.java   |   13 +-
 .../druid/query/scan/NestedDataScanQueryTest.java  |    2 +-
 .../java/org/apache/druid/segment/TestHelper.java  |    4 +-
 .../segment/virtual/ExpressionSelectorsTest.java   |  408 ++++++
 .../test/resources/nested-all-types-test-data.json |    2 +-
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |    2 +-
 .../sql/calcite/CalciteNestedDataQueryTest.java    |  340 ++++-
 21 files changed, 1435 insertions(+), 1062 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java
index db90d4a8a1e..3c0dd86b7f6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQComplexGroupByTest.java
@@ -168,22 +168,6 @@ public class MSQComplexGroupByTest extends MSQTestBase
                              ),
                              1L
                          },
-                         new Object[]{
-                             0L,
-                             StructuredData.wrap(
-                                 Map.of(
-                                     "a", 100,
-                                     "b", Map.of(
-                                         "x", "a",
-                                         "y", 1.1,
-                                         "z", List.of(1, 2, 3, 4)
-                                     ),
-                                     "c", 100,
-                                     "v", Collections.emptyList()
-                                 )
-                             ),
-                             1L
-                         },
                          new Object[]{
                              0L,
                              StructuredData.wrap(
@@ -247,6 +231,22 @@ public class MSQComplexGroupByTest extends MSQTestBase
                              ),
                              1L
                          },
+                         new Object[]{
+                             0L,
+                             StructuredData.wrap(
+                                 Map.of(
+                                     "a", 100,
+                                     "b", Map.of(
+                                         "x", "a",
+                                         "y", 1.1,
+                                         "z", List.of(1, 2, 3, 4)
+                                     ),
+                                     "c", List.of(100),
+                                     "v", Collections.emptyList()
+                                 )
+                             ),
+                             1L
+                         },
                          new Object[]{
                              0L,
                              StructuredData.wrap(Map.of("a", 300)),
@@ -303,22 +303,6 @@ public class MSQComplexGroupByTest extends MSQTestBase
                              ),
                              1L
                          },
-                         new Object[]{
-                             0L,
-                             StructuredData.wrap(
-                                 Map.of(
-                                     "a", 100,
-                                     "b", Map.of(
-                                         "x", "a",
-                                         "y", 1.1,
-                                         "z", List.of(1, 2, 3, 4)
-                                     ),
-                                     "c", 100,
-                                     "v", Collections.emptyList()
-                                 )
-                             ),
-                             1L
-                         },
                          new Object[]{
                              0L,
                              StructuredData.wrap(
@@ -382,6 +366,22 @@ public class MSQComplexGroupByTest extends MSQTestBase
                              ),
                              1L
                          },
+                         new Object[]{
+                             0L,
+                             StructuredData.wrap(
+                                 Map.of(
+                                     "a", 100,
+                                     "b", Map.of(
+                                         "x", "a",
+                                         "y", 1.1,
+                                         "z", List.of(1, 2, 3, 4)
+                                     ),
+                                     "c", List.of(100),
+                                     "v", Collections.emptyList()
+                                 )
+                             ),
+                             1L
+                         },
                          new Object[]{
                              0L,
                              StructuredData.wrap(Map.of("a", 300)),
@@ -435,11 +435,11 @@ public class MSQComplexGroupByTest extends MSQTestBase
                      .setQueryContext(context)
                      .setExpectedResultRows(List.of(
                          new 
Object[]{"{\"a\":600,\"b\":{\"x\":\"f\",\"y\":1.1,\"z\":[6,7,8,9]},\"c\":12.3,\"v\":\"b\"}"},
-                         new 
Object[]{"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"c\":100,\"v\":[]}"},
                          new 
Object[]{"{\"a\":200,\"b\":{\"x\":\"b\",\"y\":1.1,\"z\":[2,4,6]},\"c\":[\"a\",\"b\"],\"v\":[]}"},
                          new 
Object[]{"{\"a\":400,\"b\":{\"x\":\"d\",\"y\":1.1,\"z\":[3,4]},\"c\":{\"a\":1},\"v\":[]}"},
                          new 
Object[]{"{\"a\":500,\"b\":{\"x\":\"e\",\"z\":[1,2,3,4]},\"c\":\"hello\",\"v\":\"a\"}"},
                          new 
Object[]{"{\"a\":700,\"b\":{\"x\":\"g\",\"y\":1.1,\"z\":[9,null,9,9]},\"c\":null,\"v\":[]}"},
+                         new 
Object[]{"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"c\":[100],\"v\":[]}"},
                          new Object[]{"{\"a\":300}"}
                      ))
                      .verifyResults();
diff --git a/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java 
b/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java
index 3634734bfe1..a755d7577eb 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/ExprEval.java
@@ -1309,17 +1309,17 @@ public abstract class ExprEval<T>
           if (value.length == 1) {
             return ExprEval.of(asString());
           }
-          break;
+          return ExprEval.ofType(castTo, null);
         case LONG:
           if (value.length == 1) {
             return isNumericNull() ? ExprEval.ofLong(null) : 
ExprEval.ofLong(asLong());
           }
-          break;
+          return ExprEval.ofType(castTo, null);
         case DOUBLE:
           if (value.length == 1) {
             return isNumericNull() ? ExprEval.ofDouble(null) : 
ExprEval.ofDouble(asDouble());
           }
-          break;
+          return ExprEval.ofType(castTo, null);
         case ARRAY:
           ExpressionType elementType = (ExpressionType) 
castTo.getElementType();
           Object[] cast = new Object[value.length];
diff --git a/processing/src/main/java/org/apache/druid/math/expr/Parser.java 
b/processing/src/main/java/org/apache/druid/math/expr/Parser.java
index e92dd130b7c..fd42daa30fd 100644
--- a/processing/src/main/java/org/apache/druid/math/expr/Parser.java
+++ b/processing/src/main/java/org/apache/druid/math/expr/Parser.java
@@ -144,6 +144,22 @@ public class Parser
     return withFlatten ? flatten(parsed) : parsed;
   }
 
+  /**
+   * Create an {@link IdentifierExpr} for some identifier
+   */
+  public static Expr identifier(String identifier)
+  {
+    return new IdentifierExpr(identifier);
+  }
+
+  /**
+   * Create a {@link StringExpr} for a string constant
+   */
+  public static Expr constant(String constant)
+  {
+    return new StringExpr(constant);
+  }
+
   /**
    * Flatten an {@link Expr}, evaluating expressions on constants where 
possible to simplify the {@link Expr}.
    */
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java
index 28b679d49e2..3f0a6171734 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnIndexer.java
@@ -444,10 +444,7 @@ public class AutoTypeColumnIndexer implements 
DimensionIndexer<StructuredData, S
     }
     if (fieldIndexers.size() == 1 && 
fieldIndexers.containsKey(NestedPathFinder.JSON_PATH_ROOT)) {
       FieldIndexer rootField = 
fieldIndexers.get(NestedPathFinder.JSON_PATH_ROOT);
-      ColumnType logicalType = null;
-      for (ColumnType type : 
FieldTypeInfo.convertToSet(rootField.getTypes().getByteValue())) {
-        logicalType = ColumnType.leastRestrictiveType(logicalType, type);
-      }
+      ColumnType logicalType = 
ColumnType.leastRestrictiveType(FieldTypeInfo.convertToSet(rootField.getTypes().getByteValue()));
       if (logicalType != null) {
         // special handle empty arrays
         if (!rootField.getTypes().hasUntypedArray() || logicalType.isArray()) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
index 0f8ea4df145..33882d7de9d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/AutoTypeColumnMerger.java
@@ -244,9 +244,7 @@ public class AutoTypeColumnMerger implements 
DimensionMergerV9
         // pick the least restrictive type for the logical type
         isVariantType = true;
         variantTypeByte = rootTypes.getByteValue();
-        for (ColumnType type : 
FieldTypeInfo.convertToSet(rootTypes.getByteValue())) {
-          logicalType = ColumnType.leastRestrictiveType(logicalType, type);
-        }
+        logicalType = 
ColumnType.leastRestrictiveType(FieldTypeInfo.convertToSet(rootTypes.getByteValue()));
         // empty arrays can be missed since they don't have a type, so handle 
them here
         if (!logicalType.isArray() && hasArrays) {
           logicalType = ColumnTypeFactory.getInstance().ofArray(logicalType);
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java 
b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
index b670d8f4370..c136414158a 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java
@@ -235,4 +235,13 @@ public class ColumnType extends 
BaseTypeSignature<ValueType>
     return ColumnType.DOUBLE;
   }
 
+  @Nullable
+  public static ColumnType leastRestrictiveType(Iterable<ColumnType> types)
+  {
+    ColumnType leastRestrictiveType = null;
+    for (ColumnType type : types) {
+      leastRestrictiveType = leastRestrictiveType(leastRestrictiveType, type);
+    }
+    return leastRestrictiveType;
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
index a7c6f9df921..96c7258333b 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/CompressedNestedDataComplexColumn.java
@@ -900,9 +900,7 @@ public abstract class 
CompressedNestedDataComplexColumn<TStringDictionary extend
       }
     } else {
       fieldTypes = 
FieldTypeInfo.convertToSet(fieldInfo.getTypes(index).getByteValue());
-      for (ColumnType type : fieldTypes) {
-        leastRestrictiveType = 
ColumnType.leastRestrictiveType(leastRestrictiveType, type);
-      }
+      leastRestrictiveType = ColumnType.leastRestrictiveType(fieldTypes);
     }
     return leastRestrictiveType;
   }
@@ -1023,7 +1021,10 @@ public abstract class 
CompressedNestedDataComplexColumn<TStringDictionary extend
         ints = VSizeColumnarInts.readFromByteBuffer(dataBuffer);
       }
       ColumnType theType = types.getSingleType();
-      columnBuilder.setType(theType == null ? ColumnType.STRING : theType);
+      if (theType == null) {
+        theType = 
ColumnType.leastRestrictiveType(FieldTypeInfo.convertToSet(types.getByteValue()));
+      }
+      columnBuilder.setType(theType);
 
       GenericIndexed<ImmutableBitmap> rBitmaps = GenericIndexed.read(
           dataBuffer,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/FieldTypeInfo.java 
b/processing/src/main/java/org/apache/druid/segment/nested/FieldTypeInfo.java
index b00af7a132f..0ab7f613acf 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/FieldTypeInfo.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/FieldTypeInfo.java
@@ -254,10 +254,7 @@ public class FieldTypeInfo
       // adjust for empty array if needed
       if (types.hasUntypedArray()) {
         Set<ColumnType> columnTypes = 
FieldTypeInfo.convertToSet(types.getByteValue());
-        ColumnType leastRestrictive = null;
-        for (ColumnType type : columnTypes) {
-          leastRestrictive = ColumnType.leastRestrictiveType(leastRestrictive, 
type);
-        }
+        ColumnType leastRestrictive = 
ColumnType.leastRestrictiveType(columnTypes);
         if (leastRestrictive == null) {
           typeByte = add(typeByte, ColumnType.LONG_ARRAY);
         } else if (!leastRestrictive.isArray()) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldDictionaryEncodedColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldDictionaryEncodedColumn.java
index 09877ab4c69..11b21adbf9d 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldDictionaryEncodedColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/NestedFieldDictionaryEncodedColumn.java
@@ -113,11 +113,7 @@ public class 
NestedFieldDictionaryEncodedColumn<TStringDictionary extends Indexe
   )
   {
     this.types = types;
-    ColumnType leastRestrictive = null;
-    for (ColumnType type : FieldTypeInfo.convertToSet(types.getByteValue())) {
-      leastRestrictive = ColumnType.leastRestrictiveType(leastRestrictive, 
type);
-    }
-    this.logicalType = leastRestrictive;
+    this.logicalType = 
ColumnType.leastRestrictiveType(FieldTypeInfo.convertToSet(types.getByteValue()));
     this.logicalExpressionType = 
ExpressionType.fromColumnTypeStrict(logicalType);
     this.singleType = types.getSingleType();
     this.longsColumn = longsColumn;
@@ -163,11 +159,11 @@ public class 
NestedFieldDictionaryEncodedColumn<TStringDictionary extends Indexe
   public String lookupName(int id)
   {
     final int globalId = dictionary.get(id);
-    if (globalId < globalDictionary.size()) {
+    if (globalId < adjustLongId) {
       return StringUtils.fromUtf8Nullable(globalDictionary.get(globalId));
-    } else if (globalId < globalDictionary.size() + 
globalLongDictionary.size()) {
+    } else if (globalId < adjustDoubleId) {
       return String.valueOf(globalLongDictionary.get(globalId - adjustLongId));
-    } else if (globalId < globalDictionary.size() + 
globalLongDictionary.size() + globalDoubleDictionary.size()) {
+    } else if (globalId < adjustArrayId) {
       return String.valueOf(globalDoubleDictionary.get(globalId - 
adjustDoubleId));
     }
     return null;
@@ -251,11 +247,11 @@ public class 
NestedFieldDictionaryEncodedColumn<TStringDictionary extends Indexe
 
   private Object lookupGlobalScalarObject(int globalId)
   {
-    if (globalId < globalDictionary.size()) {
+    if (globalId < adjustLongId) {
       return StringUtils.fromUtf8Nullable(globalDictionary.get(globalId));
-    } else if (globalId < globalDictionary.size() + 
globalLongDictionary.size()) {
+    } else if (globalId < adjustDoubleId) {
       return globalLongDictionary.get(globalId - adjustLongId);
-    } else if (globalId < globalDictionary.size() + 
globalLongDictionary.size() + globalDoubleDictionary.size()) {
+    } else if (globalId < adjustArrayId) {
       return globalDoubleDictionary.get(globalId - adjustDoubleId);
     }
     throw new IllegalArgumentException("not a scalar in the dictionary");
diff --git 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
index 4dd8feca998..6014db28071 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/nested/VariantColumn.java
@@ -126,10 +126,7 @@ public class VariantColumn<TStringDictionary extends 
Indexed<ByteBuffer>>
     // use the variant type bytes if set, in current code the logical type 
should have been computed via this same means
     // however older versions of the code had a bug which could incorrectly 
classify mixed types as nested data
     if (variantTypeSetByte != null) {
-      ColumnType theType = null;
-      for (ColumnType type : FieldTypeInfo.convertToSet(variantTypeSetByte)) {
-        theType = ColumnType.leastRestrictiveType(theType, type);
-      }
+      ColumnType theType = 
ColumnType.leastRestrictiveType(FieldTypeInfo.convertToSet(variantTypeSetByte));
       if (theType != null) {
         // sign bit is used to indicate empty arrays, this
         if (variantTypeSetByte < 0 && !theType.isArray()) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
index 803dffb2f09..fd7d9db2961 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
@@ -73,56 +73,7 @@ public class ExpressionSelectors
   {
     final ColumnValueSelector<ExprEval> baseSelector = 
makeExprEvalSelector(columnSelectorFactory, expression);
 
-    return new ColumnValueSelector()
-    {
-      @Override
-      public double getDouble()
-      {
-        // No Assert for null handling as baseSelector already have it.
-        return baseSelector.getDouble();
-      }
-
-      @Override
-      public float getFloat()
-      {
-        // No Assert for null handling as baseSelector already have it.
-        return baseSelector.getFloat();
-      }
-
-      @Override
-      public long getLong()
-      {
-        // No Assert for null handling as baseSelector already have it.
-        return baseSelector.getLong();
-      }
-
-      @Override
-      public boolean isNull()
-      {
-        return baseSelector.isNull();
-      }
-
-      @Nullable
-      @Override
-      public Object getObject()
-      {
-        // No need for null check on getObject() since baseSelector impls will 
never return null.
-        ExprEval eval = baseSelector.getObject();
-        return eval.valueOrDefault();
-      }
-
-      @Override
-      public Class classOfObject()
-      {
-        return Object.class;
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-      {
-        inspector.visit("baseSelector", baseSelector);
-      }
-    };
+    return new EvalUnwrappingColumnValueSelector(baseSelector);
   }
 
   public static ColumnValueSelector makeStringColumnValueSelector(
@@ -132,35 +83,8 @@ public class ExpressionSelectors
   {
     final ColumnValueSelector<ExprEval> baseSelector = 
makeExprEvalSelector(columnSelectorFactory, expression);
 
-    return new ColumnValueSelector()
+    return new EvalUnwrappingColumnValueSelector(baseSelector)
     {
-      @Override
-      public double getDouble()
-      {
-        // No Assert for null handling as baseSelector already have it.
-        return baseSelector.getDouble();
-      }
-
-      @Override
-      public float getFloat()
-      {
-        // No Assert for null handling as baseSelector already have it.
-        return baseSelector.getFloat();
-      }
-
-      @Override
-      public long getLong()
-      {
-        // No Assert for null handling as baseSelector already have it.
-        return baseSelector.getLong();
-      }
-
-      @Override
-      public boolean isNull()
-      {
-        return baseSelector.isNull();
-      }
-
       @Nullable
       @Override
       public Object getObject()
@@ -169,19 +93,34 @@ public class ExpressionSelectors
         ExprEval eval = baseSelector.getObject();
         return coerceEvalToObjectOrList(eval);
       }
+    };
+  }
 
+  /**
+   * Wrap the output of {@link ColumnValueSelector#getObject()} in {@link 
ExprEval#ofType(ExpressionType, Object)}
+   * using the supplied delegate type and cast using {@link ExprEval#castTo} 
to convert to the target type. If the
+   * delegate selector type is null, {@link ExprEval#ofType} falls back to 
using {@link ExprEval#bestEffortOf}, however
+   * the target type must be not null, or else this method will fail when 
attempting to convert to an expression type.
+   */
+  public static ColumnValueSelector<?> castColumnValueSelector(
+      RowIdSupplier rowIdSupplier,
+      ColumnValueSelector<?> delegate,
+      @Nullable ColumnType delegateType,
+      ColumnType castToType
+  )
+  {
+    final ExpressionType fromType = 
ExpressionType.fromColumnType(delegateType);
+    final ExpressionType toType = 
ExpressionType.fromColumnTypeStrict(castToType);
+    final ColumnValueSelector<ExprEval> cast = new 
BaseExpressionColumnValueSelector(rowIdSupplier)
+    {
       @Override
-      public Class classOfObject()
-      {
-        return Object.class;
-      }
-
-      @Override
-      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      protected ExprEval<?> eval()
       {
-        inspector.visit("baseSelector", baseSelector);
+        return ExprEval.ofType(fromType, delegate.getObject()).castTo(toType);
       }
     };
+
+    return new EvalUnwrappingColumnValueSelector(cast);
   }
 
   /**
@@ -534,4 +473,63 @@ public class ExpressionSelectors
     }
     return eval.valueOrDefault();
   }
+
+  /**
+   * Wraps a {@link ColumnValueSelector<ExprEval>} and calls {@link 
ExprEval#valueOrDefault()} on the output of
+   * {@link #baseSelector#getObject()} in {@link #getObject()}.
+   */
+  private static class EvalUnwrappingColumnValueSelector implements 
ColumnValueSelector
+  {
+    private final ColumnValueSelector<ExprEval> baseSelector;
+
+    public EvalUnwrappingColumnValueSelector(ColumnValueSelector<ExprEval> 
baseSelector)
+    {
+      this.baseSelector = baseSelector;
+    }
+
+    @Override
+    public double getDouble()
+    {
+      return baseSelector.getDouble();
+    }
+
+    @Override
+    public float getFloat()
+    {
+      return baseSelector.getFloat();
+    }
+
+    @Override
+    public long getLong()
+    {
+      return baseSelector.getLong();
+    }
+
+    @Override
+    public boolean isNull()
+    {
+      return baseSelector.isNull();
+    }
+
+    @Nullable
+    @Override
+    public Object getObject()
+    {
+      // No need for null check on getObject() since baseSelector impls will 
never return null.
+      ExprEval eval = baseSelector.getObject();
+      return eval.valueOrDefault();
+    }
+
+    @Override
+    public Class classOfObject()
+    {
+      return Object.class;
+    }
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      inspector.visit("baseSelector", baseSelector);
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java
index 9c1d6c2c941..cb103f424f4 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java
@@ -179,7 +179,7 @@ public class ExpressionVectorSelectors
       ColumnType castTo
   )
   {
-    ExpressionVectorInputBinding binding = new 
ExpressionVectorInputBinding(inspector);
+    final ExpressionVectorInputBinding binding = new 
ExpressionVectorInputBinding(inspector);
     binding.addNumeric(columnName, 
ExpressionType.fromColumnType(selectorType), selector);
     return new ExpressionVectorObjectSelector(
         CastToTypeVectorProcessor.cast(
@@ -190,6 +190,25 @@ public class ExpressionVectorSelectors
     );
   }
 
+  public static VectorObjectSelector castObject(
+      ReadableVectorInspector inspector,
+      String columnName,
+      VectorObjectSelector selector,
+      ColumnType selectorType,
+      ColumnType castTo
+  )
+  {
+    final ExpressionVectorInputBinding binding = new 
ExpressionVectorInputBinding(inspector);
+    binding.addObjectSelector(columnName, 
ExpressionType.fromColumnType(selectorType), selector);
+    return new ExpressionVectorObjectSelector(
+        CastToTypeVectorProcessor.cast(
+            VectorProcessors.identifier(binding, columnName),
+            ExpressionType.fromColumnType(castTo)
+        ),
+        binding
+    );
+  }
+
   public static VectorValueSelector castObjectSelectorToNumeric(
       ReadableVectorInspector inspector,
       String columnName,
diff --git 
a/processing/src/main/java/org/apache/druid/segment/virtual/NestedFieldVirtualColumn.java
 
b/processing/src/main/java/org/apache/druid/segment/virtual/NestedFieldVirtualColumn.java
index f222990fe2e..3429975b94c 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/virtual/NestedFieldVirtualColumn.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/virtual/NestedFieldVirtualColumn.java
@@ -25,14 +25,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Doubles;
-import org.apache.druid.common.guava.GuavaUtils;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Numbers;
-import org.apache.druid.math.expr.Evals;
+import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.math.expr.ExpressionType;
+import org.apache.druid.math.expr.Parser;
 import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.expression.NestedDataExpressions;
 import org.apache.druid.query.extraction.ExtractionFn;
 import org.apache.druid.query.filter.ColumnIndexSelector;
 import org.apache.druid.query.filter.DruidPredicateFactory;
@@ -108,6 +109,9 @@ import java.util.Set;
  */
 public class NestedFieldVirtualColumn implements VirtualColumn
 {
+  private static final NestedDataExpressions.JsonQueryExprMacro JSON_QUERY = 
new NestedDataExpressions.JsonQueryExprMacro();
+  private static final NestedDataExpressions.JsonValueExprMacro JSON_VALUE = 
new NestedDataExpressions.JsonValueExprMacro();
+
   private final String outputName;
   private final NestedFieldSpec fieldSpec;
 
@@ -177,6 +181,7 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
     this(columnName, outputName, expectedType, null, null, path, false);
   }
 
+  @Nullable
   @Override
   public byte[] getCacheKey()
   {
@@ -208,6 +213,7 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
     return fieldSpec.parts;
   }
 
+  @Nullable
   @JsonProperty
   public ColumnType getExpectedType()
   {
@@ -239,14 +245,27 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
       ColumnSelectorFactory factory
   )
   {
-    // this column value selector is used for realtime queries, so we always 
process StructuredData
-    final ColumnValueSelector<?> baseSelector = 
factory.makeColumnValueSelector(fieldSpec.columnName);
-
-    // processFromRaw is true that means JSON_QUERY, which can return partial 
results, otherwise this virtual column
-    // is JSON_VALUE which only returns literals, so use the literal value 
selector instead
-    return fieldSpec.processFromRaw
-           ? new RawFieldColumnSelector(baseSelector, fieldSpec.parts)
-           : new RawFieldLiteralColumnValueSelector(baseSelector, 
fieldSpec.parts);
+    // realtime selectors have no optimization, fallback to 
json_query/json_value expressions
+    final Expr identifier = Parser.identifier(fieldSpec.columnName);
+    final Expr path = 
Parser.constant(NestedPathFinder.toNormalizedJsonPath(fieldSpec.parts));
+    final Expr jsonExpr;
+    if (fieldSpec.processFromRaw) {
+      // processFromRaw is true that means JSON_QUERY, which can return 
partial object results
+      jsonExpr = JSON_QUERY.apply(List.of(identifier, path));
+    } else {
+      // otherwise, this virtual column is JSON_VALUE which only returns 
primitives
+      final List<Expr> args;
+      if (fieldSpec.expectedType != null) {
+        final Expr castType = Parser.constant(
+            
ExpressionType.fromColumnTypeStrict(fieldSpec.expectedType).asTypeString()
+        );
+        args = List.of(identifier, path, castType);
+      } else {
+        args = List.of(identifier, path);
+      }
+      jsonExpr = JSON_VALUE.apply(args);
+    }
+    return ExpressionSelectors.makeColumnValueSelector(factory, jsonExpr);
   }
 
   @Nullable
@@ -282,6 +301,10 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
     BaseColumn theColumn = holder.getColumn();
     if (theColumn instanceof NestedDataComplexColumn) {
       final NestedDataComplexColumn column = (NestedDataComplexColumn) 
theColumn;
+      final ColumnType logicalType = 
column.getFieldLogicalType(fieldSpec.parts);
+      if (logicalType != null && logicalType.isArray()) {
+        return new 
FieldDimensionSelector(column.makeColumnValueSelector(fieldSpec.parts, offset));
+      }
       return column.makeDimensionSelector(fieldSpec.parts, offset, 
extractionFn);
     }
 
@@ -302,7 +325,7 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
       );
     }
 
-    if (fieldSpec.parts.size() == 1 && fieldSpec.parts.get(0) instanceof 
NestedPathArrayElement && theColumn instanceof VariantColumn) {
+    if (isRootArrayElementPathAndArrayColumn(theColumn)) {
       final VariantColumn<?> arrayColumn = (VariantColumn<?>) theColumn;
       ColumnValueSelector<?> arraySelector = 
arrayColumn.makeColumnValueSelector(offset);
       final int elementNumber = ((NestedPathArrayElement) 
fieldSpec.parts.get(0)).getIndex();
@@ -364,8 +387,18 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
     }
 
     // "JSON_VALUE", which only returns literals, on a 
NestedDataComplexColumn, so we can use the fields value selector
+
     if (theColumn instanceof NestedDataComplexColumn) {
       final NestedDataComplexColumn column = (NestedDataComplexColumn) 
theColumn;
+      final ColumnType fieldType = column.getFieldLogicalType(fieldSpec.parts);
+      if (fieldType != null && fieldSpec.expectedType != null && 
!fieldSpec.expectedType.equals(fieldType)) {
+        return ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            column.makeColumnValueSelector(fieldSpec.parts, offset),
+            fieldType,
+            fieldSpec.expectedType
+        );
+      }
       return column.makeColumnValueSelector(fieldSpec.parts, offset);
     }
 
@@ -382,69 +415,14 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
       return theColumn.makeColumnValueSelector(offset);
     }
 
-    if (fieldSpec.parts.size() == 1 && fieldSpec.parts.get(0) instanceof 
NestedPathArrayElement && theColumn instanceof VariantColumn) {
+    if (isRootArrayElementPathAndArrayColumn(theColumn)) {
       final VariantColumn<?> arrayColumn = (VariantColumn<?>) theColumn;
       ColumnValueSelector<?> arraySelector = 
arrayColumn.makeColumnValueSelector(offset);
       final int elementNumber = ((NestedPathArrayElement) 
fieldSpec.parts.get(0)).getIndex();
       if (elementNumber < 0) {
         throw new IAE("Cannot make array element selector, negative array 
index not supported");
       }
-      return new ColumnValueSelector<>()
-      {
-        @Override
-        public boolean isNull()
-        {
-          Object o = getObject();
-          return !(o instanceof Number);
-        }
-
-        @Override
-        public long getLong()
-        {
-          Object o = getObject();
-          return o instanceof Number ? ((Number) o).longValue() : 0L;
-        }
-
-        @Override
-        public float getFloat()
-        {
-          Object o = getObject();
-          return o instanceof Number ? ((Number) o).floatValue() : 0f;
-        }
-
-        @Override
-        public double getDouble()
-        {
-          Object o = getObject();
-          return o instanceof Number ? ((Number) o).doubleValue() : 0.0;
-        }
-
-        @Override
-        public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-        {
-          arraySelector.inspectRuntimeShape(inspector);
-        }
-
-        @Nullable
-        @Override
-        public Object getObject()
-        {
-          Object o = arraySelector.getObject();
-          if (o instanceof Object[]) {
-            Object[] array = (Object[]) o;
-            if (elementNumber < array.length) {
-              return array[elementNumber];
-            }
-          }
-          return null;
-        }
-
-        @Override
-        public Class<?> classOfObject()
-        {
-          return Object.class;
-        }
-      };
+      return new ArrayElementColumnValueSelector(arraySelector, elementNumber);
     }
 
     // we are not a nested column and are being asked for a path that will 
never exist, so we are nil selector
@@ -529,17 +507,8 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
         );
       }
       final VectorObjectSelector objectSelector = 
complexColumn.makeVectorObjectSelector(fieldSpec.parts, offset);
-      if (leastRestrictiveType != null &&
-          leastRestrictiveType.isArray() &&
-          fieldSpec.expectedType != null &&
-          !fieldSpec.expectedType.isArray()
-      ) {
-        final ExpressionType elementType = 
ExpressionType.fromColumnTypeStrict(leastRestrictiveType.getElementType());
-        final ExpressionType castTo = 
ExpressionType.fromColumnTypeStrict(fieldSpec.expectedType);
-        return makeVectorArrayToScalarObjectSelector(offset, objectSelector, 
elementType, castTo);
-      }
 
-      return objectSelector;
+      return castVectorObjectSelectorIfNeeded(columnName, offset, 
leastRestrictiveType, objectSelector);
     }
     // not a nested column, but we can still do stuff if the path is the 
'root', indicated by an empty path parts
     if (fieldSpec.parts.isEmpty()) {
@@ -559,19 +528,11 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
             fieldSpec.expectedType
         );
       }
-      // if the underlying column is array typed, the vector object selector 
it spits out will homogenize stuff to
-      // make all of the objects a consistent type, which is typically a good 
thing, but if we are doing mixed type
-      // stuff and expect the output type to be scalar typed, then we should 
coerce things to only extract the scalars
-      if (capabilities.isArray() && !fieldSpec.expectedType.isArray()) {
-        final VectorObjectSelector delegate = 
column.makeVectorObjectSelector(offset);
-        final ExpressionType elementType = 
ExpressionType.fromColumnTypeStrict(capabilities.getElementType());
-        final ExpressionType castTo = 
ExpressionType.fromColumnTypeStrict(fieldSpec.expectedType);
-        return makeVectorArrayToScalarObjectSelector(offset, delegate, 
elementType, castTo);
-      }
-      return column.makeVectorObjectSelector(offset);
+      final VectorObjectSelector delegate = 
column.makeVectorObjectSelector(offset);
+      return castVectorObjectSelectorIfNeeded(columnName, offset, 
capabilities.toColumnType(), delegate);
     }
 
-    if (fieldSpec.parts.size() == 1 && fieldSpec.parts.get(0) instanceof 
NestedPathArrayElement && column instanceof VariantColumn) {
+    if (isRootArrayElementPathAndArrayColumn(column)) {
       final VariantColumn<?> arrayColumn = (VariantColumn<?>) column;
       final ExpressionType elementType = ExpressionType.fromColumnTypeStrict(
           arrayColumn.getLogicalType().isArray() ? 
arrayColumn.getLogicalType().getElementType() : arrayColumn.getLogicalType()
@@ -584,46 +545,7 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
       if (elementNumber < 0) {
         throw new IAE("Cannot make array element selector, negative array 
index not supported");
       }
-      return new VectorObjectSelector()
-      {
-        private final Object[] elements = new 
Object[arraySelector.getMaxVectorSize()];
-        private int id = ReadableVectorInspector.NULL_ID;
-
-        @Override
-        public Object[] getObjectVector()
-        {
-          if (offset.getId() != id) {
-            final Object[] delegate = arraySelector.getObjectVector();
-            for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) {
-              Object maybeArray = delegate[i];
-              if (maybeArray instanceof Object[]) {
-                Object[] anArray = (Object[]) maybeArray;
-                if (elementNumber < anArray.length) {
-                  elements[i] = ExprEval.ofType(elementType, 
anArray[elementNumber]).castTo(castTo).value();
-                } else {
-                  elements[i] = null;
-                }
-              } else {
-                elements[i] = null;
-              }
-            }
-            id = offset.getId();
-          }
-          return elements;
-        }
-
-        @Override
-        public int getMaxVectorSize()
-        {
-          return arraySelector.getMaxVectorSize();
-        }
-
-        @Override
-        public int getCurrentVectorSize()
-        {
-          return arraySelector.getCurrentVectorSize();
-        }
-      };
+      return new ArrayElementVectorObjectSelector(arraySelector, offset, 
elementNumber, elementType, castTo);
     }
 
     // we are not a nested column and are being asked for a path that will 
never exist, so we are nil selector
@@ -652,147 +574,20 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
         // ... revisit this if that ever changes)
         if (theColumn instanceof DictionaryEncodedColumn) {
           final VectorObjectSelector delegate = 
theColumn.makeVectorObjectSelector(offset);
-          if (fieldSpec.expectedType != null && 
fieldSpec.expectedType.is(ValueType.LONG)) {
-            return new BaseLongVectorValueSelector(offset)
-            {
-              private int currentOffsetId = ReadableVectorInspector.NULL_ID;
-              private final long[] longs = new 
long[delegate.getMaxVectorSize()];
-              @Nullable
-              private boolean[] nulls = null;
-
-              @Override
-              public long[] getLongVector()
-              {
-                computeLongs();
-                return longs;
-              }
-
-              @Nullable
-              @Override
-              public boolean[] getNullVector()
-              {
-                computeLongs();
-                return nulls;
-              }
-
-              private void computeLongs()
-              {
-                if (currentOffsetId != offset.getId()) {
-                  currentOffsetId = offset.getId();
-                  final Object[] values = delegate.getObjectVector();
-                  for (int i = 0; i < values.length; i++) {
-                    Number n = 
ExprEval.computeNumber(Evals.asString(values[i]));
-                    if (n != null) {
-                      longs[i] = n.longValue();
-                      if (nulls != null) {
-                        nulls[i] = false;
-                      }
-                    } else {
-                      if (nulls == null) {
-                        nulls = new boolean[offset.getMaxVectorSize()];
-                      }
-                      nulls[i] = true;
-                    }
-                  }
-                }
-              }
-            };
-          } else if (fieldSpec.expectedType != null && 
fieldSpec.expectedType.is(ValueType.FLOAT)) {
-            return new BaseFloatVectorValueSelector(offset)
-            {
-              private int currentOffsetId = ReadableVectorInspector.NULL_ID;
-              private final float[] floats = new 
float[delegate.getMaxVectorSize()];
-              @Nullable
-              private boolean[] nulls = null;
-
-              @Override
-              public float[] getFloatVector()
-              {
-                computeFloats();
-                return floats;
-              }
-
-              @Nullable
-              @Override
-              public boolean[] getNullVector()
-              {
-                computeFloats();
-                return nulls;
-              }
-
-              private void computeFloats()
-              {
-                if (currentOffsetId != offset.getId()) {
-                  currentOffsetId = offset.getId();
-                  final Object[] values = delegate.getObjectVector();
-                  for (int i = 0; i < values.length; i++) {
-                    Number n = 
ExprEval.computeNumber(Evals.asString(values[i]));
-                    if (n != null) {
-                      floats[i] = n.floatValue();
-                      if (nulls != null) {
-                        nulls[i] = false;
-                      }
-                    } else {
-                      if (nulls == null) {
-                        nulls = new boolean[offset.getMaxVectorSize()];
-                      }
-                      nulls[i] = true;
-                    }
-                  }
-                }
-              }
-            };
-          } else {
-            return new BaseDoubleVectorValueSelector(offset)
-            {
-              private int currentOffsetId = ReadableVectorInspector.NULL_ID;
-              private final double[] doubles = new 
double[delegate.getMaxVectorSize()];
-              @Nullable
-              private boolean[] nulls = null;
-              @Override
-              public double[] getDoubleVector()
-              {
-                computeDoubles();
-                return doubles;
-              }
-
-              @Nullable
-              @Override
-              public boolean[] getNullVector()
-              {
-                computeDoubles();
-                return nulls;
-              }
-
-              private void computeDoubles()
-              {
-                if (currentOffsetId != offset.getId()) {
-                  currentOffsetId = offset.getId();
-                  final Object[] values = delegate.getObjectVector();
-                  for (int i = 0; i < values.length; i++) {
-                    Number n = 
ExprEval.computeNumber(Evals.asString(values[i]));
-                    if (n != null) {
-                      doubles[i] = n.doubleValue();
-                      if (nulls != null) {
-                        nulls[i] = false;
-                      }
-                    } else {
-                      if (nulls == null) {
-                        nulls = new boolean[offset.getMaxVectorSize()];
-                      }
-                      nulls[i] = true;
-                    }
-                  }
-                }
-              }
-            };
-          }
+          final ColumnType castTo = fieldSpec.expectedType != null ? 
fieldSpec.expectedType : ColumnType.DOUBLE;
+          return ExpressionVectorSelectors.castObjectSelectorToNumeric(
+              offset,
+              columnName,
+              delegate,
+              holder.getCapabilities().toColumnType(),
+              castTo
+          );
         }
         // otherwise, just use the columns native vector value selector (this 
might explode if not natively numeric)
         return theColumn.makeVectorValueSelector(offset);
       }
       // array columns can also be handled if the path is a root level array 
element accessor
-      if (fieldSpec.parts.size() == 1 && fieldSpec.parts.get(0) instanceof 
NestedPathArrayElement && theColumn instanceof VariantColumn) {
+      if (isRootArrayElementPathAndArrayColumn(theColumn)) {
         final VariantColumn<?> arrayColumn = (VariantColumn<?>) theColumn;
         VectorObjectSelector arraySelector = 
arrayColumn.makeVectorObjectSelector(offset);
         final int elementNumber = ((NestedPathArrayElement) 
fieldSpec.parts.get(0)).getIndex();
@@ -801,210 +596,11 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
         }
 
         if (fieldSpec.expectedType != null && 
fieldSpec.expectedType.is(ValueType.LONG)) {
-          return new BaseLongVectorValueSelector(offset)
-          {
-            private final long[] longs = new long[offset.getMaxVectorSize()];
-            private final boolean[] nulls = new 
boolean[offset.getMaxVectorSize()];
-            private int id = ReadableVectorInspector.NULL_ID;
-
-            private void computeNumbers()
-            {
-              if (offset.getId() != id) {
-                final Object[] maybeArrays = arraySelector.getObjectVector();
-                for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) 
{
-                  Object maybeArray = maybeArrays[i];
-                  if (maybeArray instanceof Object[]) {
-                    Object[] anArray = (Object[]) maybeArray;
-                    if (elementNumber < anArray.length) {
-                      if (anArray[elementNumber] instanceof Number) {
-                        Number n = (Number) anArray[elementNumber];
-                        longs[i] = n.longValue();
-                        nulls[i] = false;
-                      } else {
-                        Double d = anArray[elementNumber] instanceof String
-                                   ? Doubles.tryParse((String) 
anArray[elementNumber])
-                                   : null;
-                        if (d != null) {
-                          longs[i] = d.longValue();
-                          nulls[i] = false;
-                        } else {
-                          longs[i] = 0L;
-                          nulls[i] = true;
-                        }
-                      }
-                    } else {
-                      nullElement(i);
-                    }
-                  } else {
-                    // not an array?
-                    nullElement(i);
-                  }
-                }
-                id = offset.getId();
-              }
-            }
-
-            private void nullElement(int i)
-            {
-              longs[i] = 0L;
-              nulls[i] = true;
-            }
-
-            @Override
-            public long[] getLongVector()
-            {
-              if (offset.getId() != id) {
-                computeNumbers();
-              }
-              return longs;
-            }
-
-            @Nullable
-            @Override
-            public boolean[] getNullVector()
-            {
-              if (offset.getId() != id) {
-                computeNumbers();
-              }
-              return nulls;
-            }
-          };
+          return new ArrayElementLongVectorValueSelector(offset, 
arraySelector, elementNumber);
         } else if (fieldSpec.expectedType != null && 
fieldSpec.expectedType.is(ValueType.FLOAT)) {
-          return new BaseFloatVectorValueSelector(offset)
-          {
-            private final float[] floats = new 
float[offset.getMaxVectorSize()];
-            private final boolean[] nulls = new 
boolean[offset.getMaxVectorSize()];
-            private int id = ReadableVectorInspector.NULL_ID;
-
-            private void computeNumbers()
-            {
-              if (offset.getId() != id) {
-                final Object[] maybeArrays = arraySelector.getObjectVector();
-                for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) 
{
-                  Object maybeArray = maybeArrays[i];
-                  if (maybeArray instanceof Object[]) {
-                    Object[] anArray = (Object[]) maybeArray;
-                    if (elementNumber < anArray.length) {
-                      if (anArray[elementNumber] instanceof Number) {
-                        Number n = (Number) anArray[elementNumber];
-                        floats[i] = n.floatValue();
-                        nulls[i] = false;
-                      } else {
-                        Double d = anArray[elementNumber] instanceof String
-                                   ? Doubles.tryParse((String) 
anArray[elementNumber])
-                                   : null;
-                        if (d != null) {
-                          floats[i] = d.floatValue();
-                          nulls[i] = false;
-                        } else {
-                          nullElement(i);
-                        }
-                      }
-                    } else {
-                      nullElement(i);
-                    }
-                  } else {
-                    // not an array?
-                    nullElement(i);
-                  }
-                }
-                id = offset.getId();
-              }
-            }
-
-            private void nullElement(int i)
-            {
-              floats[i] = 0f;
-              nulls[i] = true;
-            }
-
-            @Override
-            public float[] getFloatVector()
-            {
-              if (offset.getId() != id) {
-                computeNumbers();
-              }
-              return floats;
-            }
-
-            @Nullable
-            @Override
-            public boolean[] getNullVector()
-            {
-              if (offset.getId() != id) {
-                computeNumbers();
-              }
-              return nulls;
-            }
-          };
+          return new ArrayElementFloatVectorValueSelector(offset, 
arraySelector, elementNumber);
         } else {
-          return new BaseDoubleVectorValueSelector(offset)
-          {
-            private final double[] doubles = new 
double[offset.getMaxVectorSize()];
-            private final boolean[] nulls = new 
boolean[offset.getMaxVectorSize()];
-            private int id = ReadableVectorInspector.NULL_ID;
-
-            private void computeNumbers()
-            {
-              if (offset.getId() != id) {
-                final Object[] maybeArrays = arraySelector.getObjectVector();
-                for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) 
{
-                  Object maybeArray = maybeArrays[i];
-                  if (maybeArray instanceof Object[]) {
-                    Object[] anArray = (Object[]) maybeArray;
-                    if (elementNumber < anArray.length) {
-                      if (anArray[elementNumber] instanceof Number) {
-                        Number n = (Number) anArray[elementNumber];
-                        doubles[i] = n.doubleValue();
-                        nulls[i] = false;
-                      } else {
-                        Double d = anArray[elementNumber] instanceof String
-                                   ? Doubles.tryParse((String) 
anArray[elementNumber])
-                                   : null;
-                        if (d != null) {
-                          doubles[i] = d;
-                          nulls[i] = false;
-                        } else {
-                          nullElement(i);
-                        }
-                      }
-                    } else {
-                      nullElement(i);
-                    }
-                  } else {
-                    // not an array?
-                    nullElement(i);
-                  }
-                }
-                id = offset.getId();
-              }
-            }
-
-            private void nullElement(int i)
-            {
-              doubles[i] = 0.0;
-              nulls[i] = true;
-            }
-
-            @Override
-            public double[] getDoubleVector()
-            {
-              if (offset.getId() != id) {
-                computeNumbers();
-              }
-              return doubles;
-            }
-
-            @Nullable
-            @Override
-            public boolean[] getNullVector()
-            {
-              if (offset.getId() != id) {
-                computeNumbers();
-              }
-              return nulls;
-            }
-          };
+          return new ArrayElementDoubleVectorValueSelector(offset, 
arraySelector, elementNumber);
         }
       }
       return NilVectorSelector.create(offset);
@@ -1013,157 +609,20 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
     final NestedDataComplexColumn column = (NestedDataComplexColumn) theColumn;
     // if column is numeric, it has a vector value selector, so we can 
directly make a vector value selector
     // if we are missing an expectedType, then we've got nothing else to work 
with so try it anyway
+    final ColumnType leastRestrictiveType = 
column.getFieldLogicalType(fieldSpec.parts);
+
     if (column.isNumeric(fieldSpec.parts) || fieldSpec.expectedType == null) {
       return column.makeVectorValueSelector(fieldSpec.parts, offset);
     }
 
-    final ColumnType leastRestrictiveType = 
column.getFieldLogicalType(fieldSpec.parts);
     final VectorObjectSelector fieldSelector = 
column.makeVectorObjectSelector(fieldSpec.parts, offset);
-    final VectorObjectSelector objectSelector;
-    // if the field has array types, wrap the object selector in the array to 
scalar value coercer
-    if (leastRestrictiveType != null && leastRestrictiveType.isArray()) {
-      objectSelector = makeVectorArrayToScalarObjectSelector(
-          offset,
-          fieldSelector,
-          
ExpressionType.fromColumnTypeStrict(leastRestrictiveType.getElementType()),
-          ExpressionType.fromColumnTypeStrict(fieldSpec.expectedType)
-      );
-    } else {
-      objectSelector = fieldSelector;
-    }
-    if (fieldSpec.expectedType.is(ValueType.LONG)) {
-      return new BaseLongVectorValueSelector(offset)
-      {
-        private final long[] longVector = new long[offset.getMaxVectorSize()];
-
-        @Nullable
-        private boolean[] nullVector = null;
-        private int id = ReadableVectorInspector.NULL_ID;
-
-        @Override
-        public long[] getLongVector()
-        {
-          computeVectorsIfNeeded();
-          return longVector;
-        }
-
-        @Nullable
-        @Override
-        public boolean[] getNullVector()
-        {
-          computeVectorsIfNeeded();
-          return nullVector;
-        }
-
-        private void computeVectorsIfNeeded()
-        {
-          if (id == offset.getId()) {
-            return;
-          }
-          id = offset.getId();
-          final Object[] vals = objectSelector.getObjectVector();
-          for (int i = 0; i < objectSelector.getCurrentVectorSize(); i++) {
-            Object v = vals[i];
-            if (v == null) {
-              if (nullVector == null) {
-                nullVector = new boolean[objectSelector.getMaxVectorSize()];
-              }
-              longVector[i] = 0L;
-              nullVector[i] = true;
-            } else {
-              Long l;
-              if (v instanceof Number) {
-                l = ((Number) v).longValue();
-              } else {
-                final String s = String.valueOf(v);
-                l = GuavaUtils.tryParseLong(s);
-                if (l == null) {
-                  final Double d = Doubles.tryParse(s);
-                  if (d != null) {
-                    l = d.longValue();
-                  }
-                }
-              }
-              if (l != null) {
-                longVector[i] = l;
-                if (nullVector != null) {
-                  nullVector[i] = false;
-                }
-              } else {
-                if (nullVector == null) {
-                  nullVector = new boolean[objectSelector.getMaxVectorSize()];
-                }
-                longVector[i] = 0L;
-                nullVector[i] = true;
-              }
-            }
-          }
-        }
-      };
-    } else {
-      // treat anything else as double
-      return new BaseDoubleVectorValueSelector(offset)
-      {
-        private final double[] doubleVector = new 
double[offset.getMaxVectorSize()];
-
-        @Nullable
-        private boolean[] nullVector = null;
-        private int id = ReadableVectorInspector.NULL_ID;
-
-        @Override
-        public double[] getDoubleVector()
-        {
-          computeVectorsIfNeeded();
-          return doubleVector;
-        }
-
-        @Nullable
-        @Override
-        public boolean[] getNullVector()
-        {
-          computeVectorsIfNeeded();
-          return nullVector;
-        }
-
-        private void computeVectorsIfNeeded()
-        {
-          if (id == offset.getId()) {
-            return;
-          }
-          id = offset.getId();
-          final Object[] vals = objectSelector.getObjectVector();
-          for (int i = 0; i < objectSelector.getCurrentVectorSize(); i++) {
-            Object v = vals[i];
-            if (v == null) {
-              if (nullVector == null) {
-                nullVector = new boolean[objectSelector.getMaxVectorSize()];
-              }
-              doubleVector[i] = 0.0;
-              nullVector[i] = true;
-            } else {
-              Double d;
-              if (v instanceof Number) {
-                d = ((Number) v).doubleValue();
-              } else {
-                d = Doubles.tryParse(String.valueOf(v));
-              }
-              if (d != null) {
-                doubleVector[i] = d;
-                if (nullVector != null) {
-                  nullVector[i] = false;
-                }
-              } else {
-                if (nullVector == null) {
-                  nullVector = new boolean[objectSelector.getMaxVectorSize()];
-                }
-                doubleVector[i] = 0.0;
-                nullVector[i] = true;
-              }
-            }
-          }
-        }
-      };
-    }
+    return ExpressionVectorSelectors.castObjectSelectorToNumeric(
+        offset,
+        columnName,
+        fieldSelector,
+        leastRestrictiveType,
+        fieldSpec.expectedType
+    );
   }
 
   @Nullable
@@ -1213,7 +672,7 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
       }
       return baseIndexSupplier;
     }
-    if (fieldSpec.parts.size() == 1 && fieldSpec.parts.get(0) instanceof 
NestedPathArrayElement && theColumn instanceof VariantColumn) {
+    if (isRootArrayElementPathAndArrayColumn(theColumn)) {
       // cannot use the array column index supplier directly, in the future 
array columns should expose a function
       // with a signature like 'getArrayElementIndexSupplier(int index)' to 
allow getting indexes for specific elements
       // if we want to support this stuff. Right now VariantArrayColumn 
doesn't actually retain enough information about
@@ -1347,58 +806,32 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
   }
 
   /**
-   * Create a {@link VectorObjectSelector} from a base selector which may 
return ARRAY types, coercing to some scalar
-   * value. Single element arrays will be unwrapped, while multi-element 
arrays will become null values. Non-arrays
-   * will be best effort cast to the castTo type.
+   * Returns true if json path is a root array element (for example '$[1]') 
and the column array column
    */
-  private static VectorObjectSelector makeVectorArrayToScalarObjectSelector(
+  private boolean isRootArrayElementPathAndArrayColumn(BaseColumn theColumn)
+  {
+    return fieldSpec.parts.size() == 1
+           && fieldSpec.parts.get(0) instanceof NestedPathArrayElement
+           && theColumn instanceof VariantColumn;
+  }
+
+  private VectorObjectSelector castVectorObjectSelectorIfNeeded(
+      String columnName,
       ReadableVectorOffset offset,
-      VectorObjectSelector delegate,
-      ExpressionType elementType,
-      ExpressionType castTo
+      ColumnType leastRestrictiveType,
+      VectorObjectSelector objectSelector
   )
   {
-    return new VectorObjectSelector()
-    {
-      final Object[] scalars = new Object[offset.getMaxVectorSize()];
-      private int id = ReadableVectorInspector.NULL_ID;
-
-      @Override
-      public Object[] getObjectVector()
-      {
-        if (offset.getId() != id) {
-          Object[] result = delegate.getObjectVector();
-          for (int i = 0; i < offset.getCurrentVectorSize(); i++) {
-            if (result[i] instanceof Object[]) {
-              Object[] o = (Object[]) result[i];
-              if (o == null || o.length != 1) {
-                scalars[i] = null;
-              } else {
-                ExprEval<?> element = ExprEval.ofType(elementType, o[0]);
-                scalars[i] = element.castTo(castTo).value();
-              }
-            } else {
-              ExprEval<?> element = ExprEval.bestEffortOf(result[i]);
-              scalars[i] = element.castTo(castTo).value();
-            }
-          }
-          id = offset.getId();
-        }
-        return scalars;
-      }
-
-      @Override
-      public int getMaxVectorSize()
-      {
-        return offset.getMaxVectorSize();
-      }
-
-      @Override
-      public int getCurrentVectorSize()
-      {
-        return offset.getCurrentVectorSize();
-      }
-    };
+    if (fieldSpec.expectedType != null && 
!Objects.equals(fieldSpec.expectedType, leastRestrictiveType)) {
+      return ExpressionVectorSelectors.castObject(
+          offset,
+          columnName,
+          objectSelector,
+          leastRestrictiveType,
+          fieldSpec.expectedType
+      );
+    }
+    return objectSelector;
   }
 
   private static class NestedFieldSpec implements EquivalenceKey
@@ -1445,159 +878,12 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
     }
   }
 
-  /**
-   * Process the "raw" data to extract non-complex values. Like {@link 
RawFieldColumnSelector} but does not return
-   * complex nested objects and does not wrap the results in {@link 
StructuredData}.
-   * <p>
-   * This is used as a selector on realtime data when the native field columns 
are not available.
-   */
-  public static class RawFieldLiteralColumnValueSelector extends 
RawFieldColumnSelector
-  {
-    public RawFieldLiteralColumnValueSelector(
-        ColumnValueSelector baseSelector,
-        List<NestedPathPart> parts
-    )
-    {
-      super(baseSelector, parts);
-    }
-
-    @Override
-    public double getDouble()
-    {
-      Object o = getObject();
-      return Numbers.tryParseDouble(o, 0.0);
-    }
-
-    @Override
-    public float getFloat()
-    {
-      Object o = getObject();
-      return Numbers.tryParseFloat(o, 0.0f);
-    }
-
-    @Override
-    public long getLong()
-    {
-      Object o = getObject();
-      return Numbers.tryParseLong(o, 0L);
-    }
-
-    @Override
-    public boolean isNull()
-    {
-      final Object o = getObject();
-      if (o instanceof Number) {
-        return false;
-      }
-      if (o instanceof String) {
-        return GuavaUtils.tryParseLong((String) o) == null && 
Doubles.tryParse((String) o) == null;
-      }
-      return true;
-    }
-
-    @Nullable
-    @Override
-    public Object getObject()
-    {
-      final StructuredData data = 
StructuredData.wrap(baseSelector.getObject());
-      if (data == null) {
-        return null;
-      }
-
-      final Object valAtPath = NestedPathFinder.find(data.getValue(), parts);
-      final ExprEval eval = ExprEval.bestEffortOf(valAtPath);
-      if (eval.type().isPrimitive() || eval.type().isPrimitiveArray()) {
-        return eval.valueOrDefault();
-      }
-      // not a primitive value, return null;
-      return null;
-    }
-
-  }
-
-  /**
-   * Process the "raw" data to extract values with {@link 
NestedPathFinder#find(Object, List)}, wrapping the result in
-   * {@link StructuredData}
-   */
-  public static class RawFieldColumnSelector implements 
ColumnValueSelector<Object>
-  {
-    protected final ColumnValueSelector baseSelector;
-    protected final List<NestedPathPart> parts;
-
-    public RawFieldColumnSelector(ColumnValueSelector baseSelector, 
List<NestedPathPart> parts)
-    {
-      this.baseSelector = baseSelector;
-      this.parts = parts;
-    }
-
-    @Override
-    public double getDouble()
-    {
-      StructuredData data = (StructuredData) getObject();
-      if (data != null) {
-        return Numbers.tryParseDouble(data.getValue(), 0.0);
-      }
-      return 0.0;
-    }
-
-    @Override
-    public float getFloat()
-    {
-      StructuredData data = (StructuredData) getObject();
-      if (data != null) {
-        return Numbers.tryParseFloat(data.getValue(), 0f);
-      }
-      return 0f;
-    }
-
-    @Override
-    public long getLong()
-    {
-      StructuredData data = (StructuredData) getObject();
-      if (data != null) {
-        return Numbers.tryParseLong(data.getValue(), 0L);
-      }
-      return 0L;
-    }
-
-    @Override
-    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
-    {
-      inspector.visit("baseSelector", baseSelector);
-      inspector.visit("parts", parts);
-    }
-
-    @Override
-    public boolean isNull()
-    {
-      StructuredData data = (StructuredData) getObject();
-      if (data == null) {
-        return true;
-      }
-      Object o = data.getValue();
-      return !(o instanceof Number || (o instanceof String && 
Doubles.tryParse((String) o) != null));
-    }
-
-    @Nullable
-    @Override
-    public Object getObject()
-    {
-      StructuredData data = StructuredData.wrap(baseSelector.getObject());
-      return StructuredData.wrap(NestedPathFinder.find(data == null ? null : 
data.getValue(), parts));
-    }
-
-    @Override
-    public Class<?> classOfObject()
-    {
-      return Object.class;
-    }
-  }
 
   /**
    * Process the "raw" data to extract vectors of values with {@link 
NestedPathFinder#find(Object, List)}, wrapping the
    * result in {@link StructuredData}
    */
-  public static class RawFieldVectorObjectSelector implements 
VectorObjectSelector
+  public static final class RawFieldVectorObjectSelector implements 
VectorObjectSelector
   {
     private final VectorObjectSelector baseSelector;
     private final List<NestedPathPart> parts;
@@ -1642,7 +928,10 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
     }
   }
 
-  public static class FieldDimensionSelector extends 
BaseSingleValueDimensionSelector
+  /**
+   * Create a {@link DimensionSelector} for a nested field on top of a {@link 
ColumnValueSelector}
+   */
+  public static final class FieldDimensionSelector extends 
BaseSingleValueDimensionSelector
   {
     private final ColumnValueSelector<?> valueSelector;
 
@@ -1665,6 +954,13 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
       if (val == null || val instanceof String) {
         return (String) val;
       }
+      if (val instanceof Object[]) {
+        Object[] arrayVal = (Object[]) val;
+        if (arrayVal.length == 1) {
+          return String.valueOf(arrayVal[0]);
+        }
+        return null;
+      }
       return String.valueOf(val);
     }
   }
@@ -1677,7 +973,7 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
    * This is used as a fall-back when making a selector and the underlying 
column is NOT a
    * {@link NestedDataComplexColumn}, whose field {@link DimensionSelector} 
natively implement this behavior.
    */
-  private static class BestEffortCastingValueSelector implements 
DimensionSelector
+  private static final class BestEffortCastingValueSelector implements 
DimensionSelector
   {
     private final DimensionSelector baseSelector;
 
@@ -1803,4 +1099,403 @@ public class NestedFieldVirtualColumn implements 
VirtualColumn
       return baseSelector.idLookup();
     }
   }
+
+  /**
+   * {@link BaseLongVectorValueSelector} for selecting a specific element of 
an array type column
+   * {@link VectorObjectSelector}
+   */
+  private static final class ArrayElementLongVectorValueSelector extends 
BaseLongVectorValueSelector
+  {
+    private final long[] longs;
+    private final boolean[] nulls;
+    private final VectorObjectSelector arraySelector;
+    private final int elementNumber;
+    private int id;
+
+    public ArrayElementLongVectorValueSelector(
+        ReadableVectorOffset offset,
+        VectorObjectSelector arraySelector,
+        int elementNumber
+    )
+    {
+      super(offset);
+      this.arraySelector = arraySelector;
+      this.elementNumber = elementNumber;
+      longs = new long[offset.getMaxVectorSize()];
+      nulls = new boolean[offset.getMaxVectorSize()];
+      id = ReadableVectorInspector.NULL_ID;
+    }
+
+    private void computeNumbers()
+    {
+      if (offset.getId() != id) {
+        final Object[] maybeArrays = arraySelector.getObjectVector();
+        for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) {
+          Object maybeArray = maybeArrays[i];
+          if (maybeArray instanceof Object[]) {
+            Object[] anArray = (Object[]) maybeArray;
+            if (elementNumber < anArray.length) {
+              if (anArray[elementNumber] instanceof Number) {
+                Number n = (Number) anArray[elementNumber];
+                longs[i] = n.longValue();
+                nulls[i] = false;
+              } else {
+                Double d = anArray[elementNumber] instanceof String
+                           ? Doubles.tryParse((String) anArray[elementNumber])
+                           : null;
+                if (d != null) {
+                  longs[i] = d.longValue();
+                  nulls[i] = false;
+                } else {
+                  longs[i] = 0L;
+                  nulls[i] = true;
+                }
+              }
+            } else {
+              nullElement(i);
+            }
+          } else {
+            // not an array?
+            nullElement(i);
+          }
+        }
+        id = offset.getId();
+      }
+    }
+
+    private void nullElement(int i)
+    {
+      longs[i] = 0L;
+      nulls[i] = true;
+    }
+
+    @Override
+    public long[] getLongVector()
+    {
+      if (offset.getId() != id) {
+        computeNumbers();
+      }
+      return longs;
+    }
+
+    @Nullable
+    @Override
+    public boolean[] getNullVector()
+    {
+      if (offset.getId() != id) {
+        computeNumbers();
+      }
+      return nulls;
+    }
+  }
+
+  /**
+   * {@link BaseFloatVectorValueSelector} for selecting a specific element of 
an array type column
+   * {@link VectorObjectSelector}
+   */
+  private static final class ArrayElementFloatVectorValueSelector extends 
BaseFloatVectorValueSelector
+  {
+    private final float[] floats;
+    private final boolean[] nulls;
+    private final VectorObjectSelector arraySelector;
+    private final int elementNumber;
+    private int id;
+
+    public ArrayElementFloatVectorValueSelector(
+        ReadableVectorOffset offset,
+        VectorObjectSelector arraySelector,
+        int elementNumber
+    )
+    {
+      super(offset);
+      this.arraySelector = arraySelector;
+      this.elementNumber = elementNumber;
+      floats = new float[offset.getMaxVectorSize()];
+      nulls = new boolean[offset.getMaxVectorSize()];
+      id = ReadableVectorInspector.NULL_ID;
+    }
+
+    private void computeNumbers()
+    {
+      if (offset.getId() != id) {
+        final Object[] maybeArrays = arraySelector.getObjectVector();
+        for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) {
+          Object maybeArray = maybeArrays[i];
+          if (maybeArray instanceof Object[]) {
+            Object[] anArray = (Object[]) maybeArray;
+            if (elementNumber < anArray.length) {
+              if (anArray[elementNumber] instanceof Number) {
+                Number n = (Number) anArray[elementNumber];
+                floats[i] = n.floatValue();
+                nulls[i] = false;
+              } else {
+                Double d = anArray[elementNumber] instanceof String
+                           ? Doubles.tryParse((String) anArray[elementNumber])
+                           : null;
+                if (d != null) {
+                  floats[i] = d.floatValue();
+                  nulls[i] = false;
+                } else {
+                  nullElement(i);
+                }
+              }
+            } else {
+              nullElement(i);
+            }
+          } else {
+            // not an array?
+            nullElement(i);
+          }
+        }
+        id = offset.getId();
+      }
+    }
+
+    private void nullElement(int i)
+    {
+      floats[i] = 0f;
+      nulls[i] = true;
+    }
+
+    @Override
+    public float[] getFloatVector()
+    {
+      if (offset.getId() != id) {
+        computeNumbers();
+      }
+      return floats;
+    }
+
+    @Override
+    public boolean[] getNullVector()
+    {
+      if (offset.getId() != id) {
+        computeNumbers();
+      }
+      return nulls;
+    }
+  }
+
+  /**
+   * {@link BaseDoubleVectorValueSelector} for selecting a specific element of 
an array type column
+   * {@link VectorObjectSelector}
+   */
+  private static final class ArrayElementDoubleVectorValueSelector extends 
BaseDoubleVectorValueSelector
+  {
+    private final double[] doubles;
+    private final boolean[] nulls;
+    private final VectorObjectSelector arraySelector;
+    private final int elementNumber;
+    private int id;
+
+    public ArrayElementDoubleVectorValueSelector(
+        ReadableVectorOffset offset,
+        VectorObjectSelector arraySelector,
+        int elementNumber
+    )
+    {
+      super(offset);
+      this.arraySelector = arraySelector;
+      this.elementNumber = elementNumber;
+      doubles = new double[offset.getMaxVectorSize()];
+      nulls = new boolean[offset.getMaxVectorSize()];
+      id = ReadableVectorInspector.NULL_ID;
+    }
+
+    private void computeNumbers()
+    {
+      if (offset.getId() != id) {
+        final Object[] maybeArrays = arraySelector.getObjectVector();
+        for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) {
+          Object maybeArray = maybeArrays[i];
+          if (maybeArray instanceof Object[]) {
+            Object[] anArray = (Object[]) maybeArray;
+            if (elementNumber < anArray.length) {
+              if (anArray[elementNumber] instanceof Number) {
+                Number n = (Number) anArray[elementNumber];
+                doubles[i] = n.doubleValue();
+                nulls[i] = false;
+              } else {
+                Double d = anArray[elementNumber] instanceof String
+                           ? Doubles.tryParse((String) anArray[elementNumber])
+                           : null;
+                if (d != null) {
+                  doubles[i] = d;
+                  nulls[i] = false;
+                } else {
+                  nullElement(i);
+                }
+              }
+            } else {
+              nullElement(i);
+            }
+          } else {
+            // not an array?
+            nullElement(i);
+          }
+        }
+        id = offset.getId();
+      }
+    }
+
+    private void nullElement(int i)
+    {
+      doubles[i] = 0.0;
+      nulls[i] = true;
+    }
+
+    @Override
+    public double[] getDoubleVector()
+    {
+      if (offset.getId() != id) {
+        computeNumbers();
+      }
+      return doubles;
+    }
+
+    @Override
+    public boolean[] getNullVector()
+    {
+      if (offset.getId() != id) {
+        computeNumbers();
+      }
+      return nulls;
+    }
+  }
+
+  /**
+   * {@link VectorObjectSelector} for selecting a specific element of an array 
type column
+   * {@link VectorObjectSelector}
+   */
+  private static final class ArrayElementVectorObjectSelector implements 
VectorObjectSelector
+  {
+    private final Object[] elements;
+    private final VectorObjectSelector arraySelector;
+    private final ReadableVectorOffset offset;
+    private final int elementNumber;
+    private final ExpressionType elementType;
+    private final ExpressionType castTo;
+    private int id;
+
+    public ArrayElementVectorObjectSelector(
+        VectorObjectSelector arraySelector,
+        ReadableVectorOffset offset,
+        int elementNumber,
+        ExpressionType elementType,
+        ExpressionType castTo
+    )
+    {
+      this.arraySelector = arraySelector;
+      this.offset = offset;
+      this.elementNumber = elementNumber;
+      this.elementType = elementType;
+      this.castTo = castTo;
+      elements = new Object[arraySelector.getMaxVectorSize()];
+      id = ReadableVectorInspector.NULL_ID;
+    }
+
+    @Override
+    public Object[] getObjectVector()
+    {
+      if (offset.getId() != id) {
+        final Object[] delegate = arraySelector.getObjectVector();
+        for (int i = 0; i < arraySelector.getCurrentVectorSize(); i++) {
+          Object maybeArray = delegate[i];
+          if (maybeArray instanceof Object[]) {
+            Object[] anArray = (Object[]) maybeArray;
+            if (elementNumber < anArray.length) {
+              elements[i] = ExprEval.ofType(elementType, 
anArray[elementNumber]).castTo(castTo).value();
+            } else {
+              elements[i] = null;
+            }
+          } else {
+            elements[i] = null;
+          }
+        }
+        id = offset.getId();
+      }
+      return elements;
+    }
+
+    @Override
+    public int getMaxVectorSize()
+    {
+      return arraySelector.getMaxVectorSize();
+    }
+
+    @Override
+    public int getCurrentVectorSize()
+    {
+      return arraySelector.getCurrentVectorSize();
+    }
+  }
+
+  /**
+   * {@link ColumnValueSelector} for selecting a specific element of an array 
type column {@link ColumnValueSelector}
+   */
+  private static final class ArrayElementColumnValueSelector implements 
ColumnValueSelector<Object>
+  {
+    private final ColumnValueSelector<?> arraySelector;
+    private final int elementNumber;
+
+    public ArrayElementColumnValueSelector(ColumnValueSelector<?> 
arraySelector, int elementNumber)
+    {
+      this.arraySelector = arraySelector;
+      this.elementNumber = elementNumber;
+    }
+
+    @Override
+    public boolean isNull()
+    {
+      Object o = getObject();
+      return !(o instanceof Number);
+    }
+
+    @Override
+    public long getLong()
+    {
+      Object o = getObject();
+      return o instanceof Number ? ((Number) o).longValue() : 0L;
+    }
+
+    @Override
+    public float getFloat()
+    {
+      Object o = getObject();
+      return o instanceof Number ? ((Number) o).floatValue() : 0f;
+    }
+
+    @Override
+    public double getDouble()
+    {
+      Object o = getObject();
+      return o instanceof Number ? ((Number) o).doubleValue() : 0.0;
+    }
+
+    @Override
+    public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+    {
+      arraySelector.inspectRuntimeShape(inspector);
+    }
+
+    @Nullable
+    @Override
+    public Object getObject()
+    {
+      Object o = arraySelector.getObject();
+      if (o instanceof Object[]) {
+        Object[] array = (Object[]) o;
+        if (elementNumber < array.length) {
+          return array[elementNumber];
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public Class<?> classOfObject()
+    {
+      return Object.class;
+    }
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/math/expr/EvalTest.java 
b/processing/src/test/java/org/apache/druid/math/expr/EvalTest.java
index b18b8cff3ea..e05f98c4540 100644
--- a/processing/src/test/java/org/apache/druid/math/expr/EvalTest.java
+++ b/processing/src/test/java/org/apache/druid/math/expr/EvalTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import junitparams.converters.Nullable;
 import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.column.TypeStrategies;
 import org.apache.druid.segment.column.TypeStrategiesTest;
@@ -445,91 +444,73 @@ public class EvalTest extends InitializedNullHandlingTest
   @Test
   public void testStringArrayToScalarStringBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofStringArray(new String[]{"foo", 
"bar"}).castTo(ExpressionType.STRING)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<STRING>] to 
[STRING]", t.getMessage());
+    ExprEval cast = ExprEval.ofStringArray(new String[]{"foo", 
"bar"}).castTo(ExpressionType.STRING);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.STRING, cast.type());
   }
 
   @Test
   public void testStringArrayToScalarLongBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofStringArray(new String[]{"foo", 
"bar"}).castTo(ExpressionType.LONG)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<STRING>] to [LONG]", 
t.getMessage());
+    ExprEval cast = ExprEval.ofStringArray(new String[]{"foo", 
"bar"}).castTo(ExpressionType.LONG);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.LONG, cast.type());
   }
 
   @Test
   public void testStringArrayToScalarDoubleBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofStringArray(new String[]{"foo", 
"bar"}).castTo(ExpressionType.DOUBLE)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<STRING>] to 
[DOUBLE]", t.getMessage());
+    ExprEval cast = ExprEval.ofStringArray(new String[]{"foo", 
"bar"}).castTo(ExpressionType.DOUBLE);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.DOUBLE, cast.type());
   }
 
   @Test
   public void testLongArrayToScalarStringBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofLongArray(new Long[]{1L, 
2L}).castTo(ExpressionType.STRING)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<LONG>] to [STRING]", 
t.getMessage());
+    ExprEval cast = ExprEval.ofLongArray(new Long[]{1L, 
2L}).castTo(ExpressionType.STRING);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.STRING, cast.type());
   }
 
   @Test
   public void testLongArrayToScalarLongBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofLongArray(new Long[]{1L, 
2L}).castTo(ExpressionType.LONG)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<LONG>] to [LONG]", 
t.getMessage());
+    ExprEval cast = ExprEval.ofLongArray(new Long[]{1L, 
2L}).castTo(ExpressionType.LONG);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.LONG, cast.type());
   }
 
   @Test
   public void testLongArrayToScalarDoubleBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofLongArray(new Long[]{1L, 
2L}).castTo(ExpressionType.DOUBLE)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<LONG>] to [DOUBLE]", 
t.getMessage());
+    ExprEval cast = ExprEval.ofLongArray(new Long[]{1L, 
2L}).castTo(ExpressionType.DOUBLE);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.DOUBLE, cast.type());
   }
 
   @Test
   public void testDoubleArrayToScalarStringBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofDoubleArray(new Double[]{1.1, 
2.2}).castTo(ExpressionType.STRING)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<DOUBLE>] to 
[STRING]", t.getMessage());
+    ExprEval cast = ExprEval.ofDoubleArray(new Double[]{1.1, 
2.2}).castTo(ExpressionType.STRING);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.STRING, cast.type());
   }
 
   @Test
   public void testDoubleArrayToScalarLongBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofDoubleArray(new Double[]{1.1, 
2.2}).castTo(ExpressionType.LONG)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<DOUBLE>] to [LONG]", 
t.getMessage());
+    ExprEval cast = ExprEval.ofDoubleArray(new Double[]{1.1, 
2.2}).castTo(ExpressionType.LONG);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.LONG, cast.type());
   }
 
   @Test
   public void testDoubleArrayToScalarDoubleBadCast()
   {
-    Throwable t = Assert.assertThrows(
-        IAE.class,
-        () -> ExprEval.ofDoubleArray(new Double[]{1.1, 
2.2}).castTo(ExpressionType.DOUBLE)
-    );
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<DOUBLE>] to 
[DOUBLE]", t.getMessage());
+    ExprEval cast = ExprEval.ofDoubleArray(new Double[]{1.1, 
2.2}).castTo(ExpressionType.DOUBLE);
+    Assert.assertNull(cast.value());
+    Assert.assertEquals(ExpressionType.DOUBLE, cast.type());
   }
 
   @Test
diff --git 
a/processing/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java 
b/processing/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java
index e624ed04e50..528d4844bf1 100644
--- a/processing/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java
+++ b/processing/src/test/java/org/apache/druid/math/expr/ExprEvalTest.java
@@ -704,15 +704,14 @@ public class ExprEvalTest extends 
InitializedNullHandlingTest
     cast = 
eval.castTo(ExpressionTypeFactory.getInstance().ofArray(ExpressionType.NESTED_DATA));
     Assert.assertArrayEquals(new Object[]{"1", "2", "foo", null, "3.3"}, 
(Object[]) cast.value());
 
-    ExprEval<?> finalEval = eval;
-    Throwable t = Assert.assertThrows(IAE.class, () -> 
finalEval.castTo(ExpressionType.LONG));
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<STRING>] to [LONG]", 
t.getMessage());
+    cast = eval.castTo(ExpressionType.LONG);
+    Assert.assertNull(cast.value());
 
-    t = Assert.assertThrows(IAE.class, () -> 
finalEval.castTo(ExpressionType.DOUBLE));
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<STRING>] to 
[DOUBLE]", t.getMessage());
+    cast = eval.castTo(ExpressionType.DOUBLE);
+    Assert.assertNull(cast.value());
 
-    t = Assert.assertThrows(IAE.class, () -> 
finalEval.castTo(ExpressionType.STRING));
-    Assert.assertEquals("Invalid type, cannot cast [ARRAY<STRING>] to 
[STRING]", t.getMessage());
+    cast = eval.castTo(ExpressionType.STRING);
+    Assert.assertNull(cast.value());
 
     eval = ExprEval.ofType(ExpressionType.LONG_ARRAY, new Object[]{1234L});
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java
 
b/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java
index 1ef233237ed..027b076d2de 100644
--- 
a/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/scan/NestedDataScanQueryTest.java
@@ -780,7 +780,7 @@ public class NestedDataScanQueryTest extends 
InitializedNullHandlingTest
     Assert.assertEquals(1, resultsRealtime.size());
     Assert.assertEquals(resultsRealtime.size(), resultsSegments.size());
     Assert.assertEquals(
-        "[[1672531200000, null, null, null, 1, 51, -0.13, 1, [], [51, -35], 
{a=700, b={x=g, y=1.1, z=[9, null, 9, 9]}, c=null, v=[]}, {x=400, y=[{l=[null], 
m=100, n=5}, {l=[a, b, c], m=a, n=1}], z={}}, null, [a, b], null, [2, 3], null, 
[null], null, [1, 0, 1], null, [{x=1}, {x=2}], null, hello, 1234, 1.234, {x=1, 
y=hello, z={a=1.1, b=1234, c=[a, b, c], d=[]}}, [a, b, c], [1, 2, 3], [1.1, 
2.2, 3.3], [], {}, [null, null], [{}, {}, {}], [{a=b, x=1, y=1.3}], 1], 
[1672531200000, , 2, null, 0, [...]
+        "[[1672531200000, null, null, null, 1, 51, -0.13, 1, [], [51, -35], 
{a=700, b={x=g, y=1.1, z=[9, null, 9, 9]}, c=null, v=[]}, {x=400, y=[{l=[null], 
m=100, n=5}, {l=[a, b, c], m=a, n=1}], z={}}, null, [a, b], null, [2, 3], null, 
[null], null, [1, 0, 1], null, [{x=1}, {x=2}], null, hello, 1234, 1.234, {x=1, 
y=hello, z={a=1.1, b=1234, c=[a, b, c], d=[]}}, [a, b, c], [1, 2, 3], [1.1, 
2.2, 3.3], [], {}, [null, null], [{}, {}, {}], [{a=b, x=1, y=1.3}], 1], 
[1672531200000, , 2, null, 0, [...]
         resultsSegments.get(0).getEvents().toString()
     );
     Assert.assertEquals(resultsSegments.get(0).getEvents().toString(), 
resultsRealtime.get(0).getEvents().toString());
diff --git a/processing/src/test/java/org/apache/druid/segment/TestHelper.java 
b/processing/src/test/java/org/apache/druid/segment/TestHelper.java
index 23589ca065c..737ce8abfbd 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestHelper.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestHelper.java
@@ -395,7 +395,9 @@ public class TestHelper
 
       final Object actualValue = actualMap.get(key);
 
-      if (expectedValue != null && expectedValue.getClass().isArray()) {
+      if ((expectedValue != null && actualValue == null) || (expectedValue == 
null && actualValue != null)) {
+        Assert.assertEquals(StringUtils.format("%s: key[%s]", msg, key), 
expectedValue, actualValue);
+      } else if (expectedValue != null && expectedValue.getClass().isArray()) {
         Assert.assertArrayEquals((Object[]) expectedValue, (Object[]) 
actualValue);
       } else if (expectedValue instanceof Float || expectedValue instanceof 
Double) {
         Assert.assertEquals(
diff --git 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 2ed144e5552..818de015448 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -22,11 +22,22 @@ package org.apache.druid.segment.virtual;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import org.apache.druid.common.guava.SettableSupplier;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.segment.FrameSegment;
+import org.apache.druid.frame.testutil.FrameTestUtil;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprEval;
@@ -45,12 +56,21 @@ import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
 import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.IncrementalIndexSegment;
+import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.RowAdapters;
+import org.apache.druid.segment.RowBasedSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.TestObjectColumnSelector;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.Offset;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
@@ -61,13 +81,18 @@ import 
org.apache.druid.segment.incremental.IndexSizeExceededException;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.utils.CloseableUtils;
+import org.joda.time.DateTime;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -132,6 +157,10 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
     });
   }
 
+  @Rule
+  public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+
   @Test
   public void test_single_value_string_bindings()
   {
@@ -707,6 +736,385 @@ public class ExpressionSelectorsTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void testCastSelectors() throws IOException
+  {
+    final RowSignature rowSignature = RowSignature.builder()
+                                                  .add("string", 
ColumnType.STRING)
+                                                  .add("multiString", 
ColumnType.STRING)
+                                                  .add("long", ColumnType.LONG)
+                                                  .add("double", 
ColumnType.DOUBLE)
+                                                  .add("stringArray", 
ColumnType.STRING_ARRAY)
+                                                  .add("longArray", 
ColumnType.LONG_ARRAY)
+                                                  .add("doubleArray", 
ColumnType.DOUBLE_ARRAY)
+                                                  .build();
+    final DateTime start = DateTimes.nowUtc();
+    List<InputRow> rows = List.of(
+        new ListBasedInputRow(
+            rowSignature,
+            start,
+            rowSignature.getColumnNames(),
+            List.of(
+                "a",
+                List.of("a1", "a2"),
+                1L,
+                1.1,
+                new Object[]{"a1", "a2"},
+                new Object[]{1L, 1L},
+                new Object[]{1.1, 1.1}
+            )
+        ),
+        new ListBasedInputRow(
+            rowSignature,
+            start.plusMinutes(1),
+            rowSignature.getColumnNames(),
+            List.of(
+                "b",
+                List.of("b1"),
+                2L,
+                2.2,
+                new Object[]{"2.2"},
+                new Object[]{2L},
+                new Object[]{2.2}
+            )
+        ),
+        new ListBasedInputRow(
+            rowSignature,
+            start.plusMinutes(2),
+            rowSignature.getColumnNames(),
+            Lists.newArrayList(
+                null,
+                List.of(),
+                null,
+                null,
+                null,
+                null,
+                null
+            )
+        ),
+        new ListBasedInputRow(
+            rowSignature,
+            start.plusMinutes(3),
+            rowSignature.getColumnNames(),
+            List.of(
+                "4",
+                Lists.newArrayList(null, null, "4.4"),
+                4L,
+                4.4,
+                new Object[0],
+                new Object[0],
+                new Object[0]
+            )
+        )
+    );
+
+    final IncrementalIndexSchema schema =
+        IncrementalIndexSchema.builder()
+                              .withDimensionsSpec(
+                                  DimensionsSpec.builder()
+                                                .setDimensions(
+                                                    List.of(
+                                                        new 
StringDimensionSchema("string"),
+                                                        new 
StringDimensionSchema("multiString"),
+                                                        new 
LongDimensionSchema("long"),
+                                                        new 
DoubleDimensionSchema("double")
+                                                    )
+                                                )
+                                                .useSchemaDiscovery(true)
+                                                .build()
+                              )
+                              .build();
+
+    IndexBuilder bob = IndexBuilder.create()
+                                   .schema(schema)
+                                   .tmpDir(temporaryFolder.newFolder())
+                                   .rows(rows);
+
+    try (final Closer closer = Closer.create()) {
+      final List<Segment> segments = List.of(
+          new IncrementalIndexSegment(bob.buildIncrementalIndex(), 
SegmentId.dummy("test")),
+          new QueryableIndexSegment(bob.buildMMappedIndex(), 
SegmentId.dummy("test")),
+          new RowBasedSegment<>(Sequences.simple(rows), 
RowAdapters.standardRow(), RowSignature.empty()),
+          new RowBasedSegment<>(Sequences.simple(rows), 
RowAdapters.standardRow(), rowSignature),
+          FrameTestUtil.cursorFactoryToFrameSegment(
+              new QueryableIndexCursorFactory(bob.buildMMappedIndex()),
+              FrameType.ROW_BASED
+          ),
+          FrameTestUtil.cursorFactoryToFrameSegment(
+              new QueryableIndexCursorFactory(bob.buildMMappedIndex()),
+              FrameType.COLUMNAR
+          )
+      );
+
+      for (Segment segment : segments) {
+        final CursorFactory cursorFactory = segment.as(CursorFactory.class);
+        Assert.assertNotNull(cursorFactory);
+        final CursorHolder holder = 
closer.register(cursorFactory.makeCursorHolder(CursorBuildSpec.FULL_SCAN));
+
+        final Cursor cursor = holder.asCursor();
+        final Offset offset = new SimpleAscendingOffset(rows.size());
+
+        ColumnValueSelector baseStringSelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("string");
+        ColumnValueSelector baseMultiStringSelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("multiString");
+        ColumnValueSelector baseLongSelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("long");
+        ColumnValueSelector baseDoubleSelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("double");
+        ColumnValueSelector baseStringArraySelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("stringArray");
+        ColumnValueSelector baseLongArraySelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("longArray");
+        ColumnValueSelector baseDoubleArraySelector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("doubleArray");
+
+        ColumnValueSelector stringSelectorToLong = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseStringSelector,
+            ColumnType.STRING,
+            ColumnType.LONG
+        );
+        ColumnValueSelector stringSelectorToDouble = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseStringSelector,
+            ColumnType.STRING,
+            ColumnType.DOUBLE
+        );
+        ColumnValueSelector stringToArraySelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseStringSelector,
+            ColumnType.STRING,
+            ColumnType.STRING_ARRAY
+        );
+
+        ColumnValueSelector multiStringToStringSelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseMultiStringSelector,
+            ColumnType.STRING,
+            ColumnType.STRING
+        );
+
+        ColumnValueSelector multiStringToStringArraySelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseMultiStringSelector,
+            ColumnType.STRING,
+            ColumnType.STRING_ARRAY
+        );
+
+        ColumnValueSelector longToStringSelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseLongSelector,
+            ColumnType.LONG,
+            ColumnType.STRING
+        );
+
+        ColumnValueSelector longToDoubleArraySelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseLongSelector,
+            ColumnType.LONG,
+            ColumnType.DOUBLE_ARRAY
+        );
+
+        ColumnValueSelector doubleToStringSelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseDoubleSelector,
+            ColumnType.DOUBLE,
+            ColumnType.STRING
+        );
+
+        ColumnValueSelector doubleToLongArraySelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseDoubleSelector,
+            ColumnType.DOUBLE,
+            ColumnType.LONG_ARRAY
+        );
+
+        ColumnValueSelector stringArrayToStringSelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseStringArraySelector,
+            ColumnType.STRING_ARRAY,
+            ColumnType.STRING
+        );
+
+        ColumnValueSelector stringArrayToLongArraySelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseStringArraySelector,
+            ColumnType.STRING_ARRAY,
+            ColumnType.LONG_ARRAY
+        );
+
+        ColumnValueSelector stringArrayToDoubleSelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseStringArraySelector,
+            ColumnType.STRING_ARRAY,
+            ColumnType.DOUBLE
+        );
+
+        ColumnValueSelector longArrayToStringSelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseLongArraySelector,
+            ColumnType.LONG_ARRAY,
+            ColumnType.STRING
+        );
+
+        ColumnValueSelector longArrayToDoubleArraySelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseLongArraySelector,
+            ColumnType.LONG_ARRAY,
+            ColumnType.DOUBLE_ARRAY
+        );
+
+        ColumnValueSelector doubleArrayToStringArraySelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseDoubleArraySelector,
+            ColumnType.DOUBLE_ARRAY,
+            ColumnType.STRING_ARRAY
+        );
+
+        ColumnValueSelector doubleArrayToLongSelector = 
ExpressionSelectors.castColumnValueSelector(
+            offset::getOffset,
+            baseDoubleArraySelector,
+            ColumnType.DOUBLE_ARRAY,
+            ColumnType.LONG
+        );
+
+        // first row
+        // "a"
+        Assert.assertNull(stringSelectorToLong.getObject());
+        Assert.assertNull(stringSelectorToDouble.getObject());
+        Assert.assertArrayEquals(new Object[]{"a"}, (Object[]) 
stringToArraySelector.getObject());
+
+        // ["a1", "a2"]
+        Assert.assertNull(multiStringToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{"a1", "a2"}, (Object[]) 
multiStringToStringArraySelector.getObject());
+
+        // 1
+        Assert.assertEquals("1", longToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{1.0}, (Object[]) 
longToDoubleArraySelector.getObject());
+
+        // 1.1
+        Assert.assertEquals("1.1", doubleToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{1L}, (Object[]) 
doubleToLongArraySelector.getObject());
+
+        // ["a1", "a2"]
+        Assert.assertNull(stringArrayToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{null, null}, (Object[]) 
stringArrayToLongArraySelector.getObject());
+        Assert.assertNull(stringArrayToDoubleSelector.getObject());
+
+        // [1, 1]
+        Assert.assertNull(longArrayToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{1.0, 1.0}, (Object[]) 
longArrayToDoubleArraySelector.getObject());
+
+        // [1.1, 1.1]
+        Assert.assertArrayEquals(new Object[]{"1.1", "1.1"}, (Object[]) 
doubleArrayToStringArraySelector.getObject());
+        Assert.assertNull(doubleArrayToLongSelector.getObject());
+
+        cursor.advance();
+        offset.increment();
+
+        // first row
+        // "b"
+        Assert.assertNull(stringSelectorToLong.getObject());
+        Assert.assertNull(stringSelectorToDouble.getObject());
+        Assert.assertArrayEquals(new Object[]{"b"}, (Object[]) 
stringToArraySelector.getObject());
+
+        // ["b1"]
+        Assert.assertEquals("b1", multiStringToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{"b1"}, (Object[]) 
multiStringToStringArraySelector.getObject());
+
+        // 2
+        Assert.assertEquals("2", longToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{2.0}, (Object[]) 
longToDoubleArraySelector.getObject());
+
+        // 2.2
+        Assert.assertEquals("2.2", doubleToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{2L}, (Object[]) 
doubleToLongArraySelector.getObject());
+
+        // ["2.2"]
+        Assert.assertEquals("2.2", stringArrayToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{2L}, (Object[]) 
stringArrayToLongArraySelector.getObject());
+        Assert.assertEquals(2.2, stringArrayToDoubleSelector.getObject());
+
+        // [2]
+        Assert.assertEquals("2", longArrayToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{2.0}, (Object[]) 
longArrayToDoubleArraySelector.getObject());
+
+        // [2.2]
+        Assert.assertArrayEquals(new Object[]{"2.2"}, (Object[]) 
doubleArrayToStringArraySelector.getObject());
+        Assert.assertEquals(2L, doubleArrayToLongSelector.getObject());
+
+        cursor.advance();
+        offset.increment();
+
+
+        // third row
+        // null
+        Assert.assertNull(stringSelectorToLong.getObject());
+        Assert.assertNull(stringSelectorToDouble.getObject());
+        Assert.assertNull(stringToArraySelector.getObject());
+
+        // []
+        Assert.assertNull(multiStringToStringSelector.getObject());
+        if (segment instanceof IncrementalIndexSegment || segment instanceof 
QueryableIndexSegment || segment instanceof FrameSegment) {
+          Assert.assertNull(multiStringToStringSelector.getObject());
+        } else {
+          // this one is kind of weird, the row based segment selector does 
not convert the empty list into null like
+          // the others do
+          Assert.assertArrayEquals(new Object[0], (Object[]) 
multiStringToStringArraySelector.getObject());
+        }
+
+        // null
+        Assert.assertNull(longToStringSelector.getObject());
+        Assert.assertNull(longToDoubleArraySelector.getObject());
+
+        // null
+        Assert.assertNull(doubleToStringSelector.getObject());
+        Assert.assertNull(doubleToLongArraySelector.getObject());
+
+        // null
+        Assert.assertNull(stringArrayToStringSelector.getObject());
+        Assert.assertNull(stringArrayToLongArraySelector.getObject());
+        Assert.assertNull(stringArrayToDoubleSelector.getObject());
+
+        // null
+        Assert.assertNull(longArrayToStringSelector.getObject());
+        Assert.assertNull(longArrayToDoubleArraySelector.getObject());
+
+        // null
+        Assert.assertNull(doubleArrayToStringArraySelector.getObject());
+        Assert.assertNull(doubleArrayToLongSelector.getObject());
+
+        cursor.advance();
+        offset.increment();
+
+        // fourth row
+        // "4"
+        Assert.assertEquals(4L, stringSelectorToLong.getObject());
+        Assert.assertEquals(4.0, stringSelectorToDouble.getObject());
+        Assert.assertArrayEquals(new Object[]{"4"}, (Object[]) 
stringToArraySelector.getObject());
+
+        // [null, null, 4.4]
+        Assert.assertNull(multiStringToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{null, null, "4.4"}, (Object[]) 
multiStringToStringArraySelector.getObject());
+
+        // 4
+        Assert.assertEquals("4", longToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{4.0}, (Object[]) 
longToDoubleArraySelector.getObject());
+
+        // 4.4
+        Assert.assertEquals("4.4", doubleToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[]{4L}, (Object[]) 
doubleToLongArraySelector.getObject());
+
+        // []
+        Assert.assertNull(stringArrayToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[0], (Object[]) 
stringArrayToLongArraySelector.getObject());
+        Assert.assertNull(stringArrayToDoubleSelector.getObject());
+
+        // []
+        Assert.assertNull(longArrayToStringSelector.getObject());
+        Assert.assertArrayEquals(new Object[0], (Object[]) 
longArrayToDoubleArraySelector.getObject());
+
+        // []
+        Assert.assertArrayEquals(new Object[0], (Object[]) 
doubleArrayToStringArraySelector.getObject());
+        Assert.assertNull(doubleArrayToLongSelector.getObject());
+      }
+    }
+  }
+
   private static DimensionSelector dimensionSelectorFromSupplier(
       final Supplier<String> supplier
   )
diff --git a/processing/src/test/resources/nested-all-types-test-data.json 
b/processing/src/test/resources/nested-all-types-test-data.json
index 5572c72799b..e8d4a4ac26e 100644
--- a/processing/src/test/resources/nested-all-types-test-data.json
+++ b/processing/src/test/resources/nested-all-types-test-data.json
@@ -1,4 +1,4 @@
-{"timestamp": "2023-01-01T00:00:00", "str":"a",    "long":1,     "double":1.0, 
 "bool": true,  "variant": 1,       "variantNumeric": 1,     
"variantEmptyObj":1,   "variantEmtpyArray":1,  "variantWithArrays": 1,         
"obj":{"a": 100, "b": {"x": "a", "y": 1.1, "z": [1, 2, 3, 4]}, "c": 100, "v": 
[]},     "complexObj":{"x": 1234, "y": [{"l": ["a", "b", "c"], "m": "a", "n": 
1},{"l": ["a", "b", "c"], "m": "a", "n": 1}], "z": {"a": [1.1, 2.2, 3.3], "b": 
true}}, "arrayString": ["a", "b"],     [...]
+{"timestamp": "2023-01-01T00:00:00", "str":"a",    "long":1,     "double":1.0, 
 "bool": true,  "variant": 1,       "variantNumeric": 1,     
"variantEmptyObj":1,   "variantEmtpyArray":1,  "variantWithArrays": 1,         
"obj":{"a": 100, "b": {"x": "a", "y": 1.1, "z": [1, 2, 3, 4]}, "c": [100], "v": 
[]},   "complexObj":{"x": 1234, "y": [{"l": ["a", "b", "c"], "m": "a", "n": 
1},{"l": ["a", "b", "c"], "m": "a", "n": 1}], "z": {"a": [1.1, 2.2, 3.3], "b": 
true}}, "arrayString": ["a", "b"],     [...]
 {"timestamp": "2023-01-01T00:00:00", "str":"",     "long":2,                   
 "bool": false, "variant": "b",     "variantNumeric": 1.1,   
"variantEmptyObj":"b", "variantEmtpyArray":2,  "variantWithArrays": "b",       
"obj":{"a": 200, "b": {"x": "b", "y": 1.1, "z": [2, 4, 6]}, "c": ["a", "b"], 
"v": []}, "complexObj":{"x": 10, "y": [{"l": ["b", "b", "c"], "m": "b", "n": 
2}, [1, 2, 3]], "z": {"a": [5.5], "b": false}},                                 
         "arrayString": ["a", "b", "c"] [...]
 {"timestamp": "2023-01-01T00:00:00", "str":"null", "long":3,     "double":2.0, 
                "variant": 3.0,     "variantNumeric": 1.0,   
"variantEmptyObj":3.3, "variantEmtpyArray":3,  "variantWithArrays": 3.0,       
"obj":{"a": 300},                                                               
       "complexObj":{"x": 4.4, "y": [{"l": [], "m": 100, "n": 3},{"l": ["a"]}, 
{"l": ["b"], "n": []}], "z": {"a": [], "b": true}},                             
    "arrayString": ["b", "c"],     [...]
 {"timestamp": "2023-01-01T00:00:00", "str":"b",    "long":4,     "double":3.3, 
 "bool": true,  "variant": "1",                              
"variantEmptyObj":{},  "variantEmtpyArray":4,  "variantWithArrays": "1",       
"obj":{"a": 400, "b": {"x": "d", "y": 1.1, "z": [3, 4]}, "c": {"a": 1},"v": 
[]},       "complexObj":{"x": 1234, "z": {"a": [1.1, 2.2, 3.3], "b": true}},    
                                                                                
       "arrayString": ["d", "e"],     [...]
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index 7aab9d34837..afae66b062d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -1502,7 +1502,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
           outprint(col);
           outprint("F");
         } else if (col instanceof Object[]) {
-          printArray(array);
+          printArray((Object[]) col);
         } else if (col instanceof List) {
           printList((List<?>) col);
         } else {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
index 8c525918ab8..42367523646 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
@@ -6075,7 +6075,7 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
                 "1",
                 "1",
                 "1",
-                
"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"c\":100,\"v\":[]}",
+                
"{\"a\":100,\"b\":{\"x\":\"a\",\"y\":1.1,\"z\":[1,2,3,4]},\"c\":[100],\"v\":[]}",
                 
"{\"x\":1234,\"y\":[{\"l\":[\"a\",\"b\",\"c\"],\"m\":\"a\",\"n\":1},{\"l\":[\"a\",\"b\",\"c\"],\"m\":\"a\",\"n\":1}],\"z\":{\"a\":[1.1,2.2,3.3],\"b\":true}}",
                 "[\"a\",\"b\"]",
                 "[\"a\",\"b\"]",
@@ -6801,44 +6801,93 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testJsonValueNestedEmptyArray()
   {
-    // The data set has empty arrays, however MSQ returns nulls. The root 
cause of the issue is the incorrect
-    // capabilities returned by NestedFieldVirtualColumn when planning which 
causes MSQ to treat the nested path
-    // as STRING, even though it is an array.
-    msqIncompatible();
-    // test for regression
     skipVectorize();
-    testQuery(
-        "SELECT json_value(cObj, '$.z.d') FROM druid.all_auto",
-        ImmutableList.of(
-            Druids.newScanQueryBuilder()
-                  .dataSource(DATA_SOURCE_ALL)
-                  .intervals(querySegmentSpec(Filtration.eternity()))
-                  .columns("v0")
-                  .columnTypes(ColumnType.STRING)
-                  .virtualColumns(
-                      new NestedFieldVirtualColumn(
-                          "cObj",
-                          "$.z.d",
-                          "v0",
-                          ColumnType.STRING
+    testBuilder()
+        .sql("SELECT json_value(cObj, '$.z.d') FROM druid.all_auto")
+        .queryContext(QUERY_CONTEXT_NO_STRINGIFY_ARRAY)
+        .expectedQueries(
+            ImmutableList.of(
+                Druids.newScanQueryBuilder()
+                      .dataSource(DATA_SOURCE_ALL)
+                      .intervals(querySegmentSpec(Filtration.eternity()))
+                      .columns("v0")
+                      .columnTypes(ColumnType.STRING)
+                      .virtualColumns(
+                          new NestedFieldVirtualColumn(
+                              "cObj",
+                              "$.z.d",
+                              "v0",
+                              ColumnType.STRING
+                          )
                       )
-                  )
-                  
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
-                  .build()
-        ),
-        ImmutableList.of(
-            new Object[]{"[]"},
-            new Object[]{"[]"},
-            new Object[]{"[]"},
-            new Object[]{"[]"},
-            new Object[]{"[]"},
-            new Object[]{"[]"},
-            new Object[]{"[]"}
-        ),
-        RowSignature.builder()
-                    .add("EXPR$0", ColumnType.STRING)
-                    .build()
-    );
+                      .context(QUERY_CONTEXT_NO_STRINGIFY_ARRAY)
+                      
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                      .build()
+            )
+        )
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{null}
+            )
+        )
+        .expectedSignature(
+            RowSignature.builder()
+                        .add("EXPR$0", ColumnType.STRING)
+                        .build()
+        )
+        .run();
+  }
+
+  @Test
+  public void testJsonValueNestedEmptyArrayReturning()
+  {
+    skipVectorize();
+    testBuilder()
+        .sql("SELECT json_value(cObj, '$.z.d' returning varchar array) FROM 
druid.all_auto")
+        .queryContext(QUERY_CONTEXT_NO_STRINGIFY_ARRAY)
+        .expectedQueries(
+            ImmutableList.of(
+                Druids.newScanQueryBuilder()
+                      .dataSource(DATA_SOURCE_ALL)
+                      .intervals(querySegmentSpec(Filtration.eternity()))
+                      .columns("v0")
+                      .columnTypes(ColumnType.STRING_ARRAY)
+                      .virtualColumns(
+                          new NestedFieldVirtualColumn(
+                              "cObj",
+                              "$.z.d",
+                              "v0",
+                              ColumnType.STRING_ARRAY
+                          )
+                      )
+                      .context(QUERY_CONTEXT_NO_STRINGIFY_ARRAY)
+                      
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                      .build()
+            )
+        )
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{Collections.emptyList()},
+                new Object[]{Collections.emptyList()},
+                new Object[]{Collections.emptyList()},
+                new Object[]{Collections.emptyList()},
+                new Object[]{Collections.emptyList()},
+                new Object[]{Collections.emptyList()},
+                new Object[]{Collections.emptyList()}
+            )
+        )
+        .expectedSignature(
+            RowSignature.builder()
+                        .add("EXPR$0", ColumnType.STRING_ARRAY)
+                        .build()
+        )
+        .run();
   }
 
   @Test
@@ -7265,7 +7314,7 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
   public void testSumPathWithArrays()
   {
     /*
-    "obj":{... "c": 100, ...}
+    "obj":{... "c": [100], ...}
     "obj":{... "c": ["a", "b"], ...}
     "obj":{...}
     "obj":{... "c": {"a": 1}, ...},
@@ -7300,7 +7349,7 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
   public void testCountPathWithArrays()
   {
     /*
-    "obj":{... "c": 100, ...}
+    "obj":{... "c": [100], ...}
     "obj":{... "c": ["a", "b"], ...}
     "obj":{...}
     "obj":{... "c": {"a": 1}, ...},
@@ -7342,11 +7391,53 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @Test
+  public void testCountPathWithArraysReturningNumber()
+  {
+    /*
+    "obj":{... "c": [100], ...}
+    "obj":{... "c": ["a", "b"], ...}
+    "obj":{...}
+    "obj":{... "c": {"a": 1}, ...},
+    "obj":{... "c": "hello", ...},
+    "obj":{... "c": 12.3, ...},
+    "obj":{... "c": null, ...},
+     */
+    testQuery(
+        "SELECT "
+        + "COUNT(JSON_VALUE(obj, '$.c' RETURNING DOUBLE)) "
+        + "FROM druid.all_auto",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(DATA_SOURCE_ALL)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .virtualColumns(new NestedFieldVirtualColumn("obj", "$.c", 
"v0", ColumnType.DOUBLE))
+                  .aggregators(
+                      aggregators(
+                          new FilteredAggregatorFactory(
+                              new CountAggregatorFactory("a0"),
+                              not(isNull("v0"))
+                          )
+                      )
+                  )
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{2L}
+        ),
+        RowSignature.builder()
+                    .add("EXPR$0", ColumnType.LONG)
+                    .build()
+    );
+  }
+
   @Test
   public void testCountPathWithArraysReturning()
   {
     /*
-    "obj":{... "c": 100, ...}
+    "obj":{... "c": [100], ...}
     "obj":{... "c": ["a", "b"], ...}
     "obj":{...}
     "obj":{... "c": {"a": 1}, ...},
@@ -7383,4 +7474,173 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
                     .build()
     );
   }
+
+  @Test
+  public void testSumPathWithArraysRealtime()
+  {
+    /*
+    "obj":{... "c": [100], ...}
+    "obj":{... "c": ["a", "b"], ...}
+    "obj":{...}
+    "obj":{... "c": {"a": 1}, ...},
+    "obj":{... "c": "hello", ...},
+    "obj":{... "c": 12.3, ...},
+    "obj":{... "c": null, ...},
+     */
+    skipVectorize();
+    testQuery(
+        "SELECT "
+        + "SUM(JSON_VALUE(obj, '$.c')) "
+        + "FROM druid.all_auto_realtime",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(DATA_SOURCE_ALL_REALTIME)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .virtualColumns(new NestedFieldVirtualColumn("obj", "$.c", 
"v0", ColumnType.DOUBLE))
+                  .aggregators(aggregators(new 
DoubleSumAggregatorFactory("a0", "v0")))
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{112.3d}
+        ),
+        RowSignature.builder()
+                    .add("EXPR$0", ColumnType.DOUBLE)
+                    .build()
+    );
+  }
+
+  @Test
+  public void testCountPathWithArraysRealtime()
+  {
+    /*
+    "obj":{... "c": [100], ...}
+    "obj":{... "c": ["a", "b"], ...}
+    "obj":{...}
+    "obj":{... "c": {"a": 1}, ...},
+    "obj":{... "c": "hello", ...},
+    "obj":{... "c": 12.3, ...},
+    "obj":{... "c": null, ...},
+     */
+    // capturing existing behavior... the count should be 4 if it was counting 
all non-null primitive values, but that
+    // would mean that the virtual column would need to plan as ARRAY<STRING> 
expected type instead of STRING
+    // ... you might notice there are actually 5 non-null obj.c values, 
however json_value only returns primitive
+    // values, so the object row is rightfully skipped
+    skipVectorize();
+    testQuery(
+        "SELECT "
+        + "COUNT(JSON_VALUE(obj, '$.c')) "
+        + "FROM druid.all_auto_realtime",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(DATA_SOURCE_ALL_REALTIME)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .virtualColumns(new NestedFieldVirtualColumn("obj", "$.c", 
"v0", ColumnType.STRING))
+                  .aggregators(
+                      aggregators(
+                          new FilteredAggregatorFactory(
+                              new CountAggregatorFactory("a0"),
+                              not(isNull("v0"))
+                          )
+                      )
+                  )
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{3L}
+        ),
+        RowSignature.builder()
+                    .add("EXPR$0", ColumnType.LONG)
+                    .build()
+    );
+  }
+
+  @Test
+  public void testCountPathWithArraysReturningNumberRealtime()
+  {
+    /*
+    "obj":{... "c": [100], ...}
+    "obj":{... "c": ["a", "b"], ...}
+    "obj":{...}
+    "obj":{... "c": {"a": 1}, ...},
+    "obj":{... "c": "hello", ...},
+    "obj":{... "c": 12.3, ...},
+    "obj":{... "c": null, ...},
+     */
+    skipVectorize();
+    testQuery(
+        "SELECT "
+        + "COUNT(JSON_VALUE(obj, '$.c' RETURNING DOUBLE)) "
+        + "FROM druid.all_auto_realtime",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(DATA_SOURCE_ALL_REALTIME)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .virtualColumns(new NestedFieldVirtualColumn("obj", "$.c", 
"v0", ColumnType.DOUBLE))
+                  .aggregators(
+                      aggregators(
+                          new FilteredAggregatorFactory(
+                              new CountAggregatorFactory("a0"),
+                              not(isNull("v0"))
+                          )
+                      )
+                  )
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{2L}
+        ),
+        RowSignature.builder()
+                    .add("EXPR$0", ColumnType.LONG)
+                    .build()
+    );
+  }
+
+  @Test
+  public void testCountPathWithArraysReturningRealtime()
+  {
+    /*
+    "obj":{... "c": [100], ...}
+    "obj":{... "c": ["a", "b"], ...}
+    "obj":{...}
+    "obj":{... "c": {"a": 1}, ...},
+    "obj":{... "c": "hello", ...},
+    "obj":{... "c": 12.3, ...},
+    "obj":{... "c": null, ...},
+     */
+    skipVectorize();
+    testQuery(
+        "SELECT "
+        + "COUNT(JSON_VALUE(obj, '$.c' RETURNING VARCHAR ARRAY)) "
+        + "FROM druid.all_auto_realtime",
+        ImmutableList.of(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(DATA_SOURCE_ALL_REALTIME)
+                  .intervals(querySegmentSpec(Filtration.eternity()))
+                  .granularity(Granularities.ALL)
+                  .virtualColumns(new NestedFieldVirtualColumn("obj", "$.c", 
"v0", ColumnType.STRING_ARRAY))
+                  .aggregators(
+                      aggregators(
+                          new FilteredAggregatorFactory(
+                              new CountAggregatorFactory("a0"),
+                              not(isNull("v0"))
+                          )
+                      )
+                  )
+                  .context(QUERY_CONTEXT_DEFAULT)
+                  .build()
+        ),
+        ImmutableList.of(
+            new Object[]{4L}
+        ),
+        RowSignature.builder()
+                    .add("EXPR$0", ColumnType.LONG)
+                    .build()
+    );
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to