jihoonson commented on a change in pull request #10401:
URL: https://github.com/apache/druid/pull/10401#discussion_r491719340



##########
File path: core/src/main/java/org/apache/druid/math/expr/Expr.java
##########
@@ -148,6 +170,60 @@ default ExprType getOutputType(InputBindingTypes 
inputTypes)
   {
     @Nullable
     ExprType getType(String name);
+
+    default boolean areNumeric(List<Expr> args)
+    {
+      boolean numeric = args.size() > 0;
+      for (Expr arg : args) {
+        ExprType argType = arg.getOutputType(this);
+        if (argType == null) {
+          numeric = false;
+          break;
+        }
+        numeric &= argType.isNumeric();
+      }
+      return numeric;
+    }
+
+    default boolean areNumeric(Expr... args)
+    {
+      boolean numeric = args.length > 0;
+      for (Expr arg : args) {
+        ExprType argType = arg.getOutputType(this);
+        if (argType == null) {
+          numeric = false;
+          break;
+        }
+        numeric &= argType.isNumeric();
+      }
+      return numeric;
+    }
+
+    default boolean canVectorize(List<Expr> args)
+    {
+      boolean canVectorize = true;
+      for (Expr arg : args) {
+        canVectorize &= arg.canVectorize(this);
+      }
+      return canVectorize;
+    }
+
+    default boolean canVectorize(Expr... args)
+    {
+      boolean canVectorize = true;
+      for (Expr arg : args) {
+        canVectorize &= arg.canVectorize(this);
+      }
+      return canVectorize;
+    }
+  }
+
+  /**
+   * {@link InputBindingTypes} + vectorizations stuff for {@link 
#buildVectorized}
+   */
+  interface VectorInputBindingTypes extends InputBindingTypes

Review comment:
       Hmm, what is the reason for splitting `VectorInputBindingTypes` and 
`VectorInputBinding`? The latter extends the former and there is only one 
implementation of the latter in this PR. Can `VectorInputBinding` extend 
`InputBindingTypes` and `VectorSizeInspector` instead? `VectorSizeInspector` 
needs to be moved in that case though as it is currently in `processing`.

##########
File path: core/src/main/java/org/apache/druid/math/expr/ExprEval.java
##########
@@ -121,6 +121,23 @@ public static ExprEval bestEffortOf(@Nullable Object val)
     return new StringExprEval(val == null ? null : String.valueOf(val));
   }
 
+  @Nullable
+  public static Number computeNumber(@Nullable String value)

Review comment:
       nit: maybe `Numbers` is a better home.

##########
File path: core/src/main/java/org/apache/druid/math/expr/ExprEval.java
##########
@@ -121,6 +121,23 @@ public static ExprEval bestEffortOf(@Nullable Object val)
     return new StringExprEval(val == null ? null : String.valueOf(val));
   }
 
+  @Nullable
+  public static Number computeNumber(@Nullable String value)
+  {
+    if (value == null) {
+      return null;
+    }
+    Number rv;
+    Long v = GuavaUtils.tryParseLong(value);

Review comment:
       Heh, we have a couple of similar methods such as 
`Numbers.parseLongObject()`, `GuavaUtils.tryParseLong()`, etc. We should 
perhaps clean up them by merging similar methods later.

##########
File path: core/src/main/java/org/apache/druid/math/expr/Expr.java
##########
@@ -148,6 +170,60 @@ default ExprType getOutputType(InputBindingTypes 
inputTypes)
   {
     @Nullable
     ExprType getType(String name);
+
+    default boolean areNumeric(List<Expr> args)
+    {
+      boolean numeric = args.size() > 0;
+      for (Expr arg : args) {
+        ExprType argType = arg.getOutputType(this);
+        if (argType == null) {
+          numeric = false;
+          break;
+        }
+        numeric &= argType.isNumeric();
+      }
+      return numeric;
+    }
+
+    default boolean areNumeric(Expr... args)
+    {
+      boolean numeric = args.length > 0;
+      for (Expr arg : args) {
+        ExprType argType = arg.getOutputType(this);
+        if (argType == null) {
+          numeric = false;
+          break;
+        }
+        numeric &= argType.isNumeric();
+      }
+      return numeric;
+    }
+
+    default boolean canVectorize(List<Expr> args)
+    {
+      boolean canVectorize = true;
+      for (Expr arg : args) {
+        canVectorize &= arg.canVectorize(this);
+      }
+      return canVectorize;
+    }
+
+    default boolean canVectorize(Expr... args)
+    {
+      boolean canVectorize = true;

Review comment:
       Same here. Can be `canVectorize(Arrays.asList(args))`.

##########
File path: core/src/main/java/org/apache/druid/math/expr/Expr.java
##########
@@ -148,6 +170,60 @@ default ExprType getOutputType(InputBindingTypes 
inputTypes)
   {
     @Nullable
     ExprType getType(String name);
+
+    default boolean areNumeric(List<Expr> args)
+    {
+      boolean numeric = args.size() > 0;
+      for (Expr arg : args) {
+        ExprType argType = arg.getOutputType(this);
+        if (argType == null) {
+          numeric = false;
+          break;
+        }
+        numeric &= argType.isNumeric();
+      }
+      return numeric;
+    }
+
+    default boolean areNumeric(Expr... args)
+    {
+      boolean numeric = args.length > 0;
+      for (Expr arg : args) {
+        ExprType argType = arg.getOutputType(this);
+        if (argType == null) {
+          numeric = false;
+          break;
+        }
+        numeric &= argType.isNumeric();
+      }
+      return numeric;

Review comment:
       Can be simplified to `areNumeric(Arrays.asList(args))`.

##########
File path: 
core/src/test/java/org/apache/druid/math/expr/VectorExprSanityTest.java
##########
@@ -0,0 +1,459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.math.expr;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.math.expr.vector.ExprEvalVector;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BooleanSupplier;
+import java.util.function.DoubleSupplier;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+
+/**
+ * randomize inputs to various vector expressions and make sure the results 
match nonvectorized expressions
+ *
+ * this is not a replacement for correctness tests, but will ensure that 
vectorized and non-vectorized expression
+ * evaluation is at least self consistent...
+ */
+public class VectorExprSanityTest extends InitializedNullHandlingTest

Review comment:
       Nice tests.

##########
File path: 
core/src/main/java/org/apache/druid/math/expr/vector/UnivariateFunctionVectorProcessor.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.math.expr.vector;
+
+import org.apache.druid.math.expr.Expr;
+
+/**
+ * common machinery for processing single input operators and functions, which 
should always treat null input as null
+ * output, and are backed by a primitive value instead of an object value (and 
need to use the null vector instead of
+ * checking the vector itself for nulls)

Review comment:
       Maybe better to explicitly mention that this class is not for string 
vectors?

##########
File path: 
processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
##########
@@ -129,13 +139,74 @@ public DimensionSelector makeDimensionSelector(
     return ExpressionSelectors.makeColumnValueSelector(factory, 
parsedExpression.get());
   }
 
+  @Override
+  public boolean canVectorize(ColumnInspector inspector)
+  {
+    final ExpressionPlan plan = ExpressionPlanner.plan(inspector, 
parsedExpression.get());
+    return plan.is(ExpressionPlan.Trait.VECTORIZABLE);
+  }
+
+  @Override
+  public VectorValueSelector makeVectorValueSelector(String columnName, 
VectorColumnSelectorFactory factory)
+  {
+    return ExpressionVectorSelectors.makeVectorValueSelector(factory, 
parsedExpression.get());
+  }
+
+  @Override
+  public VectorObjectSelector makeVectorObjectSelector(String columnName, 
VectorColumnSelectorFactory factory)
+  {
+    return ExpressionVectorSelectors.makeVectorObjectSelector(factory, 
parsedExpression.get());
+  }
+
   @Override
   public ColumnCapabilities capabilities(String columnName)
   {
-    // Note: Ideally we would fill out additional information instead of 
leaving capabilities as 'unknown', e.g. examine
-    // if the expression in question could potentially return multiple values 
and anything else. However, we don't
-    // currently have a good way of determining this, so fill this out more 
once we do
-    return new ColumnCapabilitiesImpl().setType(outputType);
+    // If possible, this should only be used as a fallback method for when 
capabilities are truly 'unknown', because we
+    // are unable to compute the output type of the expression, either due to 
incomplete type information of the
+    // inputs or because of unimplemented methods on expression 
implementations themselves, or, because a
+    // ColumnInspector is not available
+    return new ColumnCapabilitiesImpl().setType(outputType == null ? 
ValueType.FLOAT : outputType);
+  }
+
+  @Override
+  public ColumnCapabilities capabilities(ColumnInspector inspector, String 
columnName)
+  {
+    final ExpressionPlan plan = ExpressionPlanner.plan(inspector, 
parsedExpression.get());
+
+    if (plan.getOutputType() != null) {
+
+      if (outputType != null && ExprType.fromValueType(outputType) != 
plan.getOutputType()) {
+        log.warn(
+            "Projected output type %s of expression %s does not match provided 
type %s",
+            plan.getOutputType(),
+            expression,
+            outputType
+        );
+      }
+      final ExprType inferredOutputType = plan.getOutputType();
+      final ValueType valueType = ExprType.toValueType(inferredOutputType);
+      if (valueType.isNumeric()) {
+        // if float was explicitly specified preserve it, because it will 
currently never be the computed output type
+        if (ValueType.FLOAT == outputType) {
+          return 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT);
+        }
+        return 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ExprType.toValueType(inferredOutputType));
+      }
+
+      // we don't have to check for unknown input here because output type is 
unable to be inferred if we don't know
+      // the complete set of input types
+      if (plan.any(ExpressionPlan.Trait.NON_SCALAR_OUTPUT, 
ExpressionPlan.Trait.NEEDS_APPLIED)) {

Review comment:
       Should `hasMultipleValues` be set when the plan has the 
`NON_SCALAR_OUTPUT` trait?

##########
File path: 
core/src/main/java/org/apache/druid/math/expr/BinaryLogicalOperatorExpr.java
##########
@@ -68,6 +70,17 @@ public ExprType getOutputType(InputBindingTypes inputTypes)
     }
     return implicitCast;
   }
+  @Override
+  public boolean canVectorize(InputBindingTypes inputTypes)
+  {
+    return inputTypes.areNumeric(left, right) && inputTypes.canVectorize(left, 
right);

Review comment:
       This code seems duplicate in binary operators, but I guess it would be 
nice to keep both `canVectorize()` and `buildVectorized()` together in the same 
class.

##########
File path: 
processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
##########
@@ -225,6 +229,22 @@ public boolean isNull()
     }
   }
 
+  public static VectorValueSelector makeVectorValueSelector(
+      VectorColumnSelectorFactory columnSelectorFactory,
+      String fieldName,
+      String expression,

Review comment:
       Please annotate these with `@Nullable`.

##########
File path: 
processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
##########
@@ -129,13 +139,74 @@ public DimensionSelector makeDimensionSelector(
     return ExpressionSelectors.makeColumnValueSelector(factory, 
parsedExpression.get());
   }
 
+  @Override
+  public boolean canVectorize(ColumnInspector inspector)
+  {
+    final ExpressionPlan plan = ExpressionPlanner.plan(inspector, 
parsedExpression.get());
+    return plan.is(ExpressionPlan.Trait.VECTORIZABLE);
+  }
+
+  @Override
+  public VectorValueSelector makeVectorValueSelector(String columnName, 
VectorColumnSelectorFactory factory)
+  {
+    return ExpressionVectorSelectors.makeVectorValueSelector(factory, 
parsedExpression.get());
+  }
+
+  @Override
+  public VectorObjectSelector makeVectorObjectSelector(String columnName, 
VectorColumnSelectorFactory factory)
+  {
+    return ExpressionVectorSelectors.makeVectorObjectSelector(factory, 
parsedExpression.get());
+  }
+
   @Override
   public ColumnCapabilities capabilities(String columnName)
   {
-    // Note: Ideally we would fill out additional information instead of 
leaving capabilities as 'unknown', e.g. examine
-    // if the expression in question could potentially return multiple values 
and anything else. However, we don't
-    // currently have a good way of determining this, so fill this out more 
once we do
-    return new ColumnCapabilitiesImpl().setType(outputType);
+    // If possible, this should only be used as a fallback method for when 
capabilities are truly 'unknown', because we
+    // are unable to compute the output type of the expression, either due to 
incomplete type information of the
+    // inputs or because of unimplemented methods on expression 
implementations themselves, or, because a
+    // ColumnInspector is not available
+    return new ColumnCapabilitiesImpl().setType(outputType == null ? 
ValueType.FLOAT : outputType);
+  }
+
+  @Override
+  public ColumnCapabilities capabilities(ColumnInspector inspector, String 
columnName)
+  {
+    final ExpressionPlan plan = ExpressionPlanner.plan(inspector, 
parsedExpression.get());
+
+    if (plan.getOutputType() != null) {
+
+      if (outputType != null && ExprType.fromValueType(outputType) != 
plan.getOutputType()) {
+        log.warn(
+            "Projected output type %s of expression %s does not match provided 
type %s",
+            plan.getOutputType(),
+            expression,
+            outputType
+        );
+      }
+      final ExprType inferredOutputType = plan.getOutputType();
+      final ValueType valueType = ExprType.toValueType(inferredOutputType);
+      if (valueType.isNumeric()) {
+        // if float was explicitly specified preserve it, because it will 
currently never be the computed output type
+        if (ValueType.FLOAT == outputType) {
+          return 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT);
+        }
+        return 
ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ExprType.toValueType(inferredOutputType));

Review comment:
       nit: duplicate `ExprType.toValueType(inferredOutputType)`.

##########
File path: 
processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.Parser;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
+
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ExpressionPlanner
+{
+  private ExpressionPlanner()
+  {
+    // No instantiation.
+  }
+
+  /**
+   * Druid tries to be chill to expressions to make up for not having a well 
defined table schema across segments. This
+   * method performs some analysis to determine what sort of selectors can be 
constructed on top of an expression,
+   * whether or not the expression will need implicitly mapped across 
multi-valued inputs, if the expression produces
+   * multi-valued outputs, is vectorizable, and everything else interesting 
when making a selector.
+   *
+   * Results are stored in a {@link ExpressionPlan}, which can be examined to 
do whatever is necessary to make things
+   * function properly.
+   */
+  public static ExpressionPlan plan(ColumnInspector inspector, Expr expression)
+  {
+    final Expr.BindingAnalysis analysis = expression.analyzeInputs();
+    Parser.validateExpr(expression, analysis);
+
+    EnumSet<ExpressionPlan.Trait> traits = 
EnumSet.noneOf(ExpressionPlan.Trait.class);
+    Set<String> maybeMultiValued = new HashSet<>();
+    List<String> needsApplied = ImmutableList.of();
+    ValueType singleInputType = null;
+    ExprType outputType = null;
+
+    final Set<String> columns = analysis.getRequiredBindings();
+
+    // check and set traits which allow optimized selectors to be created
+    if (columns.isEmpty()) {
+      traits.add(ExpressionPlan.Trait.CONSTANT);
+    } else if (columns.size() == 1) {
+      final String column = Iterables.getOnlyElement(columns);
+      final ColumnCapabilities capabilities = 
inspector.getColumnCapabilities(column);
+
+      // These flags allow for selectors that wrap a single underlying column 
to be optimized, through caching results
+      // and via allowing deferred execution in the case of building dimension 
selectors.
+      //    SINGLE_INPUT_SCALAR
+      // is set if an input is single valued, and the output is definitely 
single valued, with an additional requirement
+      // for strings that the column is dictionary encoded.
+      //    SINGLE_INPUT_MAPPABLE
+      // is set when a single input string column, which can be multi-valued, 
but if so, it must be implicitly mappable
+      // (i.e. the expression is not treating its input as an array and not 
wanting to output an array)
+      if (capabilities != null) {
+        boolean isSingleInputMappable = false;
+        boolean isSingleInputScalar = 
capabilities.hasMultipleValues().isFalse() &&
+                                      !analysis.hasInputArrays() &&
+                                      !analysis.isOutputArray();
+        if (capabilities.getType() == ValueType.STRING) {
+          isSingleInputScalar &= capabilities.isDictionaryEncoded().isTrue();
+          isSingleInputMappable = capabilities.isDictionaryEncoded().isTrue() 
&&
+                                  
!capabilities.hasMultipleValues().isUnknown() &&

Review comment:
       Hmm, is this correct? Should this be 
`capabilities.hasMultipleValues().isMaybeTrue()` instead?

##########
File path: core/src/main/java/org/apache/druid/math/expr/Function.java
##########
@@ -517,6 +532,24 @@ public ExprEval apply(List<Expr> args, Expr.ObjectBinding 
bindings)
 
       return ExprEval.of(retVal);
     }
+
+    @Override
+    public boolean canVectorize(Expr.InputBindingTypes inputTypes, List<Expr> 
args)
+    {
+      return (args.size() == 1 || (args.get(1).isLiteral() && 
args.get(1).getLiteralValue() instanceof Number)) &&
+             inputTypes.canVectorize(args);
+    }
+
+    @Override
+    public <T> ExprVectorProcessor<T> 
asVectorProcessor(Expr.VectorInputBindingTypes inputTypes, List<Expr> args)
+    {
+      if (args.size() == 1 || args.get(1).isLiteral()) {
+        final int radix = args.size() == 1 ? 10 : ((Number) 
args.get(1).getLiteralValue()).intValue();
+        return VectorProcessors.parseLong(inputTypes, args.get(0), radix);
+      }
+      // not yet implemented, how did we get here

Review comment:
       Is this something that should be resolved in this PR?

##########
File path: 
core/src/main/java/org/apache/druid/math/expr/vector/VectorProcessors.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.math.expr.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprType;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+
+public class VectorProcessors
+{
+  public static <T> ExprVectorProcessor<T> constantString(@Nullable String 
constant, int maxVectorSize)
+  {
+    final String[] strings = new String[maxVectorSize];
+    Arrays.fill(strings, constant);
+    final ExprEvalStringVector eval = new ExprEvalStringVector(strings);
+    return new ExprVectorProcessor<T>()
+    {
+      @Override
+      public ExprEvalVector<T> evalVector(Expr.VectorInputBinding bindings)
+      {
+        return (ExprEvalVector<T>) eval;
+      }
+
+      @Override
+      public ExprType getOutputType()
+      {
+        return ExprType.STRING;
+      }
+    };
+  }
+
+  public static <T> ExprVectorProcessor<T> constantDouble(@Nullable Double 
constant, int maxVectorSize)
+  {
+    final double[] doubles = new double[maxVectorSize];
+    final boolean[] nulls;
+    if (constant == null) {
+      nulls = new boolean[maxVectorSize];
+      Arrays.fill(nulls, true);
+    } else {
+      nulls = null;
+      Arrays.fill(doubles, constant);
+    }
+    final ExprEvalDoubleVector eval = new ExprEvalDoubleVector(doubles, nulls);
+    return new ExprVectorProcessor<T>()
+    {
+      @Override
+      public ExprEvalVector<T> evalVector(Expr.VectorInputBinding bindings)
+      {
+        return (ExprEvalVector<T>) eval;
+      }
+
+      @Override
+      public ExprType getOutputType()
+      {
+        return ExprType.DOUBLE;
+      }
+    };
+  }
+
+  public static <T> ExprVectorProcessor<T> constantLong(@Nullable Long 
constant, int maxVectorSize)
+  {
+    final long[] longs = new long[maxVectorSize];
+    final boolean[] nulls;
+    if (constant == null) {
+      nulls = new boolean[maxVectorSize];
+      Arrays.fill(nulls, true);
+    } else {
+      nulls = null;
+      Arrays.fill(longs, constant);
+    }
+    final ExprEvalLongVector eval = new ExprEvalLongVector(longs, nulls);
+    return new ExprVectorProcessor<T>()
+    {
+      @Override
+      public ExprEvalVector<T> evalVector(Expr.VectorInputBinding bindings)
+      {
+        return (ExprEvalVector<T>) eval;
+      }
+
+      @Override
+      public ExprType getOutputType()
+      {
+        return ExprType.LONG;
+      }
+    };
+  }
+
+  public static <T> ExprVectorProcessor<T> 
parseLong(Expr.VectorInputBindingTypes inputTypes, Expr arg, int radix)
+  {
+    final ExprVectorProcessor<?> processor = new 
LongOutStringInFunctionVectorProcessor(
+        CastToTypeVectorProcessor.castToType(arg.buildVectorized(inputTypes), 
ExprType.STRING),
+        inputTypes.getMaxVectorSize()
+    )
+    {
+      @Override
+      public void processIndex(String[] strings, long[] longs, boolean[] 
outputNulls, int i)
+      {
+        try {
+          longs[i] = Long.parseLong(strings[i], radix);
+          outputNulls[i] = false;

Review comment:
       Should this logic match to its [non-vectorized 
version](https://github.com/apache/druid/blob/master/core/src/main/java/org/apache/druid/math/expr/Function.java#L507-L512)?

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark.query;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.NoopEscalator;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.DruidPlanner;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.PlannerFactory;
+import org.apache.druid.sql.calcite.planner.PlannerResult;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Benchmark that tests various SQL queries.
+ */
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+public class SqlExpressionBenchmark
+{
+  private static final Logger log = new Logger(SqlExpressionBenchmark.class);
+
+  static {
+    NullHandling.initializeForTests();
+    Calcites.setSystemProperties();
+  }
+
+  private static final DruidProcessingConfig PROCESSING_CONFIG = new 
DruidProcessingConfig()
+  {
+    @Override
+    public int intermediateComputeSizeBytes()
+    {
+      return 512 * 1024 * 1024;
+    }
+
+    @Override
+    public int getNumMergeBuffers()
+    {
+      return 3;
+    }
+
+    @Override
+    public int getNumThreads()
+    {
+      return 1;
+    }
+
+    @Override
+    public boolean useParallelMergePoolConfigured()
+    {
+      return true;
+    }
+
+    @Override
+    public String getFormatString()
+    {
+      return "benchmarks-processing-%s";
+    }
+  };
+
+
+  private static final List<String> QUERIES = ImmutableList.of(
+      // ===========================
+      // non-expression reference queries
+      // ===========================
+      // 0: non-expression timeseries reference, 1 columns
+      "SELECT SUM(long1) FROM foo",
+      // 1: non-expression timeseries reference, 2 columns
+      "SELECT SUM(long1), SUM(long2) FROM foo",
+      // 2: non-expression timeseries reference, 3 columns
+      "SELECT SUM(long1), SUM(long4), SUM(double1) FROM foo",
+      // 3: non-expression timeseries reference, 4 columns
+      "SELECT SUM(long1), SUM(long4), SUM(double1), SUM(float3) FROM foo",
+      // 4: non-expression timeseries reference, 5 columns
+      "SELECT SUM(long1), SUM(long4), SUM(double1), SUM(float3), SUM(long5) 
FROM foo",
+      // 5: group by non-expr with 1 agg
+      "SELECT string2, SUM(long1) FROM foo GROUP BY 1 ORDER BY 2",
+      // 6: group by non-expr with 2 agg
+      "SELECT string2, SUM(long1), SUM(double3) FROM foo GROUP BY 1 ORDER BY 
2",
+      // ===========================
+      // expressions
+      // ===========================
+      // 7: math op - 2 longs
+      "SELECT SUM(long1 * long2) FROM foo",
+      // 8: mixed math - 2 longs, 1 double
+      "SELECT SUM((long1 * long2) / double1) FROM foo",
+      // 9: mixed math - 2 longs, 1 double, 1 float
+      "SELECT SUM(float3 + ((long1 * long4)/double1)) FROM foo",
+      // 10: mixed math - 3 longs, 1 double, 1 float
+      "SELECT SUM(long5 - (float3 + ((long1 * long4)/double1))) FROM foo",
+      // 11: all same math op - 3 longs, 1 double, 1 float
+      "SELECT SUM(long5 * float3 * long1 * long4 * double1) FROM foo",
+      // 12: cos
+      "SELECT cos(double2) FROM foo",
+      // 13: unary negate
+      "SELECT SUM(-long4) FROM foo",
+      // 14: string long
+      "SELECT SUM(PARSE_LONG(string1)) FROM foo",
+      // 15: string longer
+      "SELECT SUM(PARSE_LONG(string3)) FROM foo",
+      // 16: time floor, non-expr col + reg agg
+      "SELECT TIME_FLOOR(__time, 'PT1H'), string2, SUM(double4) FROM foo GROUP 
BY 1,2 ORDER BY 3",
+      // 17: time floor, non-expr col + expr agg
+      "SELECT TIME_FLOOR(__time, 'PT1H'), string2, SUM(long1 * double4) FROM 
foo GROUP BY 1,2 ORDER BY 3",
+      // 18: time floor + non-expr agg (timeseries) (non-expression reference)
+      "SELECT TIME_FLOOR(__time, 'PT1H'), SUM(long1) FROM foo GROUP BY 1 ORDER 
BY 1",
+      // 19: time floor + expr agg (timeseries)
+      "SELECT TIME_FLOOR(__time, 'PT1H'), SUM(long1 * long4) FROM foo GROUP BY 
1 ORDER BY 1",
+      // 20: time floor + non-expr agg (group by)
+      "SELECT TIME_FLOOR(__time, 'PT1H'), SUM(long1) FROM foo GROUP BY 1 ORDER 
BY 2",
+      // 21: time floor + expr agg (group by)
+      "SELECT TIME_FLOOR(__time, 'PT1H'), SUM(long1 * long4) FROM foo GROUP BY 
1 ORDER BY 2",
+      // 22: time floor offset by 1 day + non-expr agg (group by)
+      "SELECT TIME_FLOOR(TIMESTAMPADD(DAY, -1, __time), 'PT1H'), SUM(long1) 
FROM foo GROUP BY 1 ORDER BY 1",
+      // 23: time floor offset by 1 day + expr agg (group by)
+      "SELECT TIME_FLOOR(TIMESTAMPADD(DAY, -1, __time), 'PT1H'), SUM(long1 * 
long4) FROM foo GROUP BY 1 ORDER BY 1",
+      // 24: group by long expr with non-expr agg
+      "SELECT (long1 * long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
+      // 25: group by non-expr with expr agg
+      "SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2"
+  );
+
+  @Param({"5000000"})
+  private int rowsPerSegment;
+
+  @Param({"false", "force"})
+  private String vectorize;
+
+  @Param({
+      // non-expression reference
+      "0",
+      "1",
+      "2",
+      "3",
+      "4",
+      "5",
+      "6",
+      // expressions
+      "7",
+      "8",
+      "9",
+      "10",
+      "11",
+      "12",
+      "13",
+      "14",
+      "15",
+      "16",
+      "17",
+      "18",
+      "19",
+      "20",
+      "21",
+      "22",
+      "23",
+      "24",
+      "25"
+  })
+  private String query;
+
+  @Nullable
+  private PlannerFactory plannerFactory;
+  private Closer closer = Closer.create();
+
+  @Setup(Level.Trial)
+  public void setup() throws Exception
+  {
+    final GeneratorSchemaInfo schemaInfo = 
GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench");
+
+    final DataSegment dataSegment = DataSegment.builder()
+                                               .dataSource("foo")
+                                               
.interval(schemaInfo.getDataInterval())
+                                               .version("1")
+                                               .shardSpec(new 
LinearShardSpec(0))
+                                               .size(0)
+                                               .build();
+
+    final PlannerConfig plannerConfig = new PlannerConfig();
+
+    final SegmentGenerator segmentGenerator = closer.register(new 
SegmentGenerator());
+    log.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", 
segmentGenerator.getCacheDir(), rowsPerSegment);
+    final QueryableIndex index = segmentGenerator.generate(dataSegment, 
schemaInfo, Granularities.NONE, rowsPerSegment);
+
+    final QueryRunnerFactoryConglomerate conglomerate = 
QueryStackTests.createQueryRunnerFactoryConglomerate(
+        closer,
+        PROCESSING_CONFIG
+    );
+
+    final SpecificSegmentsQuerySegmentWalker walker = new 
SpecificSegmentsQuerySegmentWalker(conglomerate).add(
+        dataSegment,
+        index
+    );
+    closer.register(walker);
+
+    final SchemaPlus rootSchema =
+        CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, 
AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+    plannerFactory = new PlannerFactory(
+        rootSchema,
+        CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+        CalciteTests.createOperatorTable(),
+        CalciteTests.createExprMacroTable(),
+        plannerConfig,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+        CalciteTests.getJsonMapper(),
+        CalciteTests.DRUID_SCHEMA_NAME
+    );
+
+    checkSanity();
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown() throws Exception
+  {
+    closer.close();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void querySql(Blackhole blackhole) throws Exception
+  {
+    final Map<String, Object> context = ImmutableMap.of("vectorize", 
vectorize);
+    final AuthenticationResult authenticationResult = 
NoopEscalator.getInstance()
+                                                                   
.createEscalatedAuthenticationResult();
+    try (final DruidPlanner planner = plannerFactory.createPlanner(context, 
ImmutableList.of(), authenticationResult)) {
+      final PlannerResult plannerResult = 
planner.plan(QUERIES.get(Integer.parseInt(query)));
+      final Sequence<Object[]> resultSequence = plannerResult.run();
+      final Object[] lastRow = resultSequence.accumulate(null, (accumulated, 
in) -> in);
+      blackhole.consume(lastRow);
+    }
+  }
+
+  public void checkSanity() throws Exception

Review comment:
       Maybe we should add this testing for the benchmark queries in 
`CalciteQueryTest` so that CI can run?

##########
File path: processing/src/main/java/org/apache/druid/segment/VirtualColumn.java
##########
@@ -246,6 +246,12 @@ default VectorObjectSelector makeVectorObjectSelector(
    */
   ColumnCapabilities capabilities(String columnName);
 
+
+  default ColumnCapabilities capabilities(ColumnInspector inspector, String 
columnName)

Review comment:
       Please add javadoc for this method and update the javadoc of the other 
`capabilities(String columnName)`. Also, should we deprecate the other one if 
we want to eventually use it only as a fallback? 

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionVectorSelectorBenchmark.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.Parser;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ExpressionVectorSelectorBenchmark
+{
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  @Param({"1000000"})
+  private int rowsPerSegment;
+
+  @Param({"false", "true"})
+  private boolean vectorize;
+
+  @Param({
+      "long1 * long2",
+      "double1 * double3",
+      "float1 + float3",
+      "(long1 - long4) / double3",
+      "max(double3, double5)",
+      "min(double4, double1)",
+      "cos(float3)",
+      "sin(long4)",
+      "parse_long(string1)",
+      "parse_long(string1) * double3",
+      "parse_long(string5) * parse_long(string1)",
+      "parse_long(string5) * parse_long(string1) * double3"
+  })
+  private String expression;
+
+  private QueryableIndex index;
+  private Closer closer;
+
+  @Nullable
+  private ExprType outputType;
+
+  @Setup(Level.Trial)
+  public void setup()
+  {
+    this.closer = Closer.create();
+
+    final GeneratorSchemaInfo schemaInfo = 
GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench");
+
+    final DataSegment dataSegment = DataSegment.builder()
+                                               .dataSource("foo")
+                                               
.interval(schemaInfo.getDataInterval())
+                                               .version("1")
+                                               .shardSpec(new 
LinearShardSpec(0))
+                                               .size(0)
+                                               .build();
+
+    final SegmentGenerator segmentGenerator = closer.register(new 
SegmentGenerator());
+    this.index = closer.register(
+        segmentGenerator.generate(dataSegment, schemaInfo, Granularities.HOUR, 
rowsPerSegment)
+    );
+
+    Expr parsed = Parser.parse(expression, ExprMacroTable.nil());
+    outputType = parsed.getOutputType(
+        new ColumnInspector()
+        {
+          @Nullable
+          @Override
+          public ColumnCapabilities getColumnCapabilities(String column)
+          {
+            return QueryableIndexStorageAdapter.getColumnCapabilities(index, 
column);
+          }
+        }
+    );
+    checkSanity();
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown() throws Exception
+  {
+    closer.close();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void scan(Blackhole blackhole)
+  {
+    final VirtualColumns virtualColumns = VirtualColumns.create(
+        ImmutableList.of(
+            new ExpressionVirtualColumn(
+                "v",
+                expression,
+                ExprType.toValueType(outputType),
+                TestExprMacroTable.INSTANCE
+            )
+        )
+    );
+    if (vectorize) {
+      VectorCursor cursor = new 
QueryableIndexStorageAdapter(index).makeVectorCursor(
+          null,
+          index.getDataInterval(),
+          virtualColumns,
+          false,
+          512,
+          null
+      );
+      if (outputType.isNumeric()) {
+        VectorValueSelector selector = 
cursor.getColumnSelectorFactory().makeValueSelector("v");
+        if (outputType.equals(ExprType.DOUBLE)) {
+          while (!cursor.isDone()) {
+            blackhole.consume(selector.getDoubleVector());
+            blackhole.consume(selector.getNullVector());
+            cursor.advance();
+          }
+        } else {
+          while (!cursor.isDone()) {
+            blackhole.consume(selector.getLongVector());
+            blackhole.consume(selector.getNullVector());
+            cursor.advance();
+          }
+        }
+        closer.register(cursor);
+      }
+    } else {
+      Sequence<Cursor> cursors = new 
QueryableIndexStorageAdapter(index).makeCursors(
+          null,
+          index.getDataInterval(),
+          virtualColumns,
+          Granularities.ALL,
+          false,
+          null
+      );
+
+      int rowCount = cursors
+          .map(cursor -> {
+            final ColumnValueSelector selector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
+            int rows = 0;
+            while (!cursor.isDone()) {
+              blackhole.consume(selector.getObject());
+              rows++;
+              cursor.advance();
+            }
+            return rows;
+          }).accumulate(0, (acc, in) -> acc + in);
+
+      blackhole.consume(rowCount);
+    }
+  }
+
+  private void checkSanity()
+  {
+    final List<Object> results = new ArrayList<>(rowsPerSegment);
+    final VirtualColumns virtualColumns = VirtualColumns.create(
+        ImmutableList.of(
+            new ExpressionVirtualColumn(
+                "v",
+                expression,
+                ExprType.toValueType(outputType),
+                TestExprMacroTable.INSTANCE
+            )
+        )
+    );
+    VectorCursor cursor = new 
QueryableIndexStorageAdapter(index).makeVectorCursor(
+        null,
+        index.getDataInterval(),
+        virtualColumns,
+        false,
+        512,
+        null
+    );
+
+    VectorValueSelector selector = null;
+    VectorObjectSelector objectSelector = null;
+    if (outputType.isNumeric()) {
+      selector = cursor.getColumnSelectorFactory().makeValueSelector("v");
+    } else {
+      objectSelector = 
cursor.getColumnSelectorFactory().makeObjectSelector("v");
+    }
+    int rowCount = 0;
+    while (!cursor.isDone()) {
+      boolean[] nulls;
+      switch (outputType) {
+        case LONG:
+          nulls = selector.getNullVector();
+          long[] longs = selector.getLongVector();
+          for (int i = 0; i < selector.getCurrentVectorSize(); i++, 
rowCount++) {
+            results.add(nulls != null && nulls[i] ? null : longs[i]);
+          }
+          break;
+        case DOUBLE:
+          nulls = selector.getNullVector();
+          double[] doubles = selector.getDoubleVector();
+          for (int i = 0; i < selector.getCurrentVectorSize(); i++, 
rowCount++) {
+            results.add(nulls != null && nulls[i] ? null : doubles[i]);
+          }
+          break;
+        case STRING:
+          Object[] objects = objectSelector.getObjectVector();
+          for (int i = 0; i < objectSelector.getCurrentVectorSize(); i++, 
rowCount++) {
+            results.add(objects[i]);
+          }
+          break;
+      }
+
+      cursor.advance();
+    }
+    closer.register(cursor);
+
+    Sequence<Cursor> cursors = new 
QueryableIndexStorageAdapter(index).makeCursors(
+        null,
+        index.getDataInterval(),
+        virtualColumns,
+        Granularities.ALL,
+        false,
+        null
+    );
+
+    int rowCountCursor = cursors
+        .map(nonVectorized -> {
+          final ColumnValueSelector nonSelector = 
nonVectorized.getColumnSelectorFactory().makeColumnValueSelector("v");
+          int rows = 0;
+          while (!nonVectorized.isDone()) {
+            Assert.assertEquals(StringUtils.format("Failed at row %s", rows), 
nonSelector.getObject(), results.get(rows));
+            rows++;
+            nonVectorized.advance();
+          }
+          return rows;
+        }).accumulate(0, (acc, in) -> acc + in);
+
+    Assert.assertTrue(rowCountCursor > 0);

Review comment:
       Did you intend to check if `rowCount` and `RowCountCursor` match?

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionVectorSelectorBenchmark.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.Parser;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)

Review comment:
       Are warmups and measurements too small?

##########
File path: 
processing/src/main/java/org/apache/druid/segment/ColumnInspector.java
##########
@@ -34,4 +36,15 @@
    */
   @Nullable
   ColumnCapabilities getColumnCapabilities(String column);
+
+  @Nullable
+  @Override
+  default ExprType getType(String name)

Review comment:
       nit: I guess we will want to keep this method until we merge `ExprType` 
and `ValueType`. Myabe `getExprType()` better to be more clear?

##########
File path: 
processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorValueSelector.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.vector.ExprVectorProcessor;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+
+public class ExpressionVectorValueSelector implements VectorValueSelector
+{
+  final Expr.VectorInputBinding bindings;
+  final ExprVectorProcessor<?> processor;
+  final float[] floats;

Review comment:
       Maybe good to mention that `float` is default for a historical reason?

##########
File path: 
benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionVectorSelectorBenchmark.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.benchmark;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.Parser;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.Assert;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ExpressionVectorSelectorBenchmark
+{
+  static {
+    NullHandling.initializeForTests();
+  }
+
+  @Param({"1000000"})
+  private int rowsPerSegment;
+
+  @Param({"false", "true"})
+  private boolean vectorize;
+
+  @Param({
+      "long1 * long2",
+      "double1 * double3",
+      "float1 + float3",
+      "(long1 - long4) / double3",
+      "max(double3, double5)",
+      "min(double4, double1)",
+      "cos(float3)",
+      "sin(long4)",
+      "parse_long(string1)",
+      "parse_long(string1) * double3",
+      "parse_long(string5) * parse_long(string1)",
+      "parse_long(string5) * parse_long(string1) * double3"
+  })
+  private String expression;
+
+  private QueryableIndex index;
+  private Closer closer;
+
+  @Nullable
+  private ExprType outputType;
+
+  @Setup(Level.Trial)
+  public void setup()
+  {
+    this.closer = Closer.create();
+
+    final GeneratorSchemaInfo schemaInfo = 
GeneratorBasicSchemas.SCHEMA_MAP.get("expression-testbench");
+
+    final DataSegment dataSegment = DataSegment.builder()
+                                               .dataSource("foo")
+                                               
.interval(schemaInfo.getDataInterval())
+                                               .version("1")
+                                               .shardSpec(new 
LinearShardSpec(0))
+                                               .size(0)
+                                               .build();
+
+    final SegmentGenerator segmentGenerator = closer.register(new 
SegmentGenerator());
+    this.index = closer.register(
+        segmentGenerator.generate(dataSegment, schemaInfo, Granularities.HOUR, 
rowsPerSegment)
+    );
+
+    Expr parsed = Parser.parse(expression, ExprMacroTable.nil());
+    outputType = parsed.getOutputType(
+        new ColumnInspector()
+        {
+          @Nullable
+          @Override
+          public ColumnCapabilities getColumnCapabilities(String column)
+          {
+            return QueryableIndexStorageAdapter.getColumnCapabilities(index, 
column);
+          }
+        }
+    );
+    checkSanity();
+  }
+
+  @TearDown(Level.Trial)
+  public void tearDown() throws Exception
+  {
+    closer.close();
+  }
+
+  @Benchmark
+  @BenchmarkMode(Mode.AverageTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public void scan(Blackhole blackhole)
+  {
+    final VirtualColumns virtualColumns = VirtualColumns.create(
+        ImmutableList.of(
+            new ExpressionVirtualColumn(
+                "v",
+                expression,
+                ExprType.toValueType(outputType),
+                TestExprMacroTable.INSTANCE
+            )
+        )
+    );
+    if (vectorize) {
+      VectorCursor cursor = new 
QueryableIndexStorageAdapter(index).makeVectorCursor(
+          null,
+          index.getDataInterval(),
+          virtualColumns,
+          false,
+          512,
+          null
+      );
+      if (outputType.isNumeric()) {
+        VectorValueSelector selector = 
cursor.getColumnSelectorFactory().makeValueSelector("v");
+        if (outputType.equals(ExprType.DOUBLE)) {
+          while (!cursor.isDone()) {
+            blackhole.consume(selector.getDoubleVector());
+            blackhole.consume(selector.getNullVector());
+            cursor.advance();
+          }
+        } else {
+          while (!cursor.isDone()) {
+            blackhole.consume(selector.getLongVector());
+            blackhole.consume(selector.getNullVector());
+            cursor.advance();
+          }
+        }
+        closer.register(cursor);
+      }
+    } else {
+      Sequence<Cursor> cursors = new 
QueryableIndexStorageAdapter(index).makeCursors(
+          null,
+          index.getDataInterval(),
+          virtualColumns,
+          Granularities.ALL,
+          false,
+          null
+      );
+
+      int rowCount = cursors
+          .map(cursor -> {
+            final ColumnValueSelector selector = 
cursor.getColumnSelectorFactory().makeColumnValueSelector("v");
+            int rows = 0;
+            while (!cursor.isDone()) {
+              blackhole.consume(selector.getObject());
+              rows++;
+              cursor.advance();
+            }
+            return rows;
+          }).accumulate(0, (acc, in) -> acc + in);
+
+      blackhole.consume(rowCount);
+    }
+  }
+
+  private void checkSanity()

Review comment:
       Looking at what this method does, it seems pretty useful. What do you 
think about making this test a unit test, so that CI can run? Or, if we already 
have enough unit tests which cover the same logic, I guess we don't need this 
to make the benchmark faster.

##########
File path: 
processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
##########
@@ -157,6 +158,24 @@ public int indexOf(final String columnName)
     return columnPositions.applyAsInt(columnName);
   }
 
+  public ColumnInspector asColumnInspector()

Review comment:
       Hmm, should `RowSignature` simply implement `ColumnInspector`?

##########
File path: core/src/main/java/org/apache/druid/math/expr/Function.java
##########
@@ -517,6 +532,24 @@ public ExprEval apply(List<Expr> args, Expr.ObjectBinding 
bindings)
 
       return ExprEval.of(retVal);
     }
+
+    @Override
+    public boolean canVectorize(Expr.InputBindingTypes inputTypes, List<Expr> 
args)
+    {
+      return (args.size() == 1 || (args.get(1).isLiteral() && 
args.get(1).getLiteralValue() instanceof Number)) &&
+             inputTypes.canVectorize(args);
+    }
+
+    @Override
+    public <T> ExprVectorProcessor<T> 
asVectorProcessor(Expr.VectorInputBindingTypes inputTypes, List<Expr> args)
+    {
+      if (args.size() == 1 || args.get(1).isLiteral()) {
+        final int radix = args.size() == 1 ? 10 : ((Number) 
args.get(1).getLiteralValue()).intValue();
+        return VectorProcessors.parseLong(inputTypes, args.get(0), radix);
+      }
+      // not yet implemented, how did we get here

Review comment:
       Could you add these details in the comment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to