Copilot commented on code in PR #17659: URL: https://github.com/apache/pinot/pull/17659#discussion_r2894622117
########## pinot-core/src/main/java/org/apache/pinot/core/function/scalar/FilterMvScalarFunction.java: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.function.scalar; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.EnumMap; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.FunctionInfo; +import org.apache.pinot.common.function.PinotScalarFunction; +import org.apache.pinot.common.function.sql.PinotSqlFunction; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.operator.transform.function.FilterMvPredicateEvaluator; +import org.apache.pinot.spi.annotations.ScalarFunction; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Scalar wrapper for filterMv so FunctionRegistry can expose type signatures for query planning and execution paths + * that resolve scalar functions. + */ +@ScalarFunction(names = {"filterMv"}) +public class FilterMvScalarFunction implements PinotScalarFunction { + private static final int MAX_CACHED_EVALUATORS = 10_000; + private static final Map<ColumnDataType, FunctionInfo> TYPE_FUNCTION_INFO_MAP = + new EnumMap<>(ColumnDataType.class); + private static final Cache<CacheKey, FilterMvPredicateEvaluator> EVALUATOR_CACHE = + CacheBuilder.newBuilder().maximumSize(MAX_CACHED_EVALUATORS).build(); + + static { + try { + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.INT_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", int[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.LONG_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", long[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.FLOAT_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", float[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.DOUBLE_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", double[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.STRING_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", String[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.BYTES_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", byte[][].class, String.class), + FilterMvScalarFunction.class, false)); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getName() { + return "filterMv"; + } + + @Override + public Set<String> getNames() { + return Set.of("filterMv"); + } + + @Nullable + @Override + public PinotSqlFunction toPinotSqlFunction() { + // Should already be registered in PinotOperatorTable by the transform function implementation + return null; + } + + @Nullable + @Override + public FunctionInfo getFunctionInfo(ColumnDataType[] argumentTypes) { + if (argumentTypes.length != 2) { + return null; + } + if (argumentTypes[1] != ColumnDataType.STRING) { + return null; + } + return TYPE_FUNCTION_INFO_MAP.get(argumentTypes[0].getStoredType()); + } + + @Nullable + @Override + public FunctionInfo getFunctionInfo(int numArguments) { + if (numArguments != 2) { + return null; + } + // Fall back to string + return getFunctionInfo(new ColumnDataType[]{ColumnDataType.STRING_ARRAY, ColumnDataType.STRING}); + } + + public static int[] filterMv(int[] values, String predicate) { + FilterMvPredicateEvaluator evaluator = evaluatorFor(predicate, DataType.INT); + int numValues = values.length; + int count = 0; + for (int value : values) { + if (evaluator.matchesInt(value)) { + count++; + } + } + if (count == numValues) { + return values; + } + int[] filtered = new int[count]; + int idx = 0; + for (int value : values) { + if (evaluator.matchesInt(value)) { + filtered[idx++] = value; + } + } + return filtered; + } + + public static long[] filterMv(long[] values, String predicate) { + FilterMvPredicateEvaluator evaluator = evaluatorFor(predicate, DataType.LONG); + int numValues = values.length; + int count = 0; + for (long value : values) { + if (evaluator.matchesLong(value)) { + count++; + } + } + if (count == numValues) { + return values; + } + long[] filtered = new long[count]; + int idx = 0; + for (long value : values) { + if (evaluator.matchesLong(value)) { + filtered[idx++] = value; + } + } + return filtered; + } + + public static float[] filterMv(float[] values, String predicate) { + FilterMvPredicateEvaluator evaluator = evaluatorFor(predicate, DataType.FLOAT); + int numValues = values.length; + int count = 0; + for (float value : values) { + if (evaluator.matchesFloat(value)) { + count++; + } + } + if (count == numValues) { + return values; + } + float[] filtered = new float[count]; + int idx = 0; + for (float value : values) { + if (evaluator.matchesFloat(value)) { + filtered[idx++] = value; + } + } + return filtered; + } + + public static double[] filterMv(double[] values, String predicate) { + FilterMvPredicateEvaluator evaluator = evaluatorFor(predicate, DataType.DOUBLE); + int numValues = values.length; + int count = 0; + for (double value : values) { + if (evaluator.matchesDouble(value)) { + count++; + } + } + if (count == numValues) { + return values; + } + double[] filtered = new double[count]; + int idx = 0; + for (double value : values) { + if (evaluator.matchesDouble(value)) { + filtered[idx++] = value; + } + } + return filtered; + } + + public static String[] filterMv(String[] values, String predicate) { + FilterMvPredicateEvaluator evaluator = evaluatorFor(predicate, DataType.STRING); + int numValues = values.length; + int count = 0; + for (String value : values) { + if (evaluator.matchesString(value)) { + count++; + } + } + if (count == numValues) { + return values; + } + String[] filtered = new String[count]; + int idx = 0; + for (String value : values) { + if (evaluator.matchesString(value)) { + filtered[idx++] = value; + } + } + return filtered; + } + + public static byte[][] filterMv(byte[][] values, String predicate) { + FilterMvPredicateEvaluator evaluator = evaluatorFor(predicate, DataType.BYTES); + int numValues = values.length; + int count = 0; + for (byte[] value : values) { + if (evaluator.matchesBytes(value)) { + count++; + } + } + if (count == numValues) { + return values; + } + byte[][] filtered = new byte[count][]; + int idx = 0; + for (byte[] value : values) { + if (evaluator.matchesBytes(value)) { + filtered[idx++] = value; + } + } + return filtered; + } + + private static FilterMvPredicateEvaluator evaluatorFor(String predicate, DataType dataType) { + CacheKey key = new CacheKey(predicate, dataType); + try { + return EVALUATOR_CACHE.get(key, + () -> FilterMvPredicateEvaluator.forPredicate(predicate, dataType, null)); + } catch (Exception e) { + throw new RuntimeException("Failed to create predicate evaluator for: " + predicate, e); + } Review Comment: `evaluatorFor()` wraps all predicate-parsing failures into a plain `RuntimeException`, which will surface as a query execution error (QUERY_EXECUTION) instead of a query validation error (QUERY_VALIDATION). Consider propagating `BadQueryRequestException` (or wrapping failures into it) so invalid predicates produce a proper user-facing 4xx/validation response and consistent error handling across engines. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/FilterMvPredicateEvaluator.java: ########## @@ -0,0 +1,450 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.common.request.context.predicate.Predicate; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.sql.parsers.CalciteSqlParser; + + +public final class FilterMvPredicateEvaluator { + private static final String VALUE_PLACEHOLDER = "v"; + private static final EnumSet<Predicate.Type> SUPPORTED_PREDICATE_TYPES = + EnumSet.of(Predicate.Type.EQ, Predicate.Type.NOT_EQ, Predicate.Type.IN, Predicate.Type.NOT_IN, + Predicate.Type.RANGE, Predicate.Type.REGEXP_LIKE); + + private final EvalNode _root; + + private FilterMvPredicateEvaluator(EvalNode root) { + _root = root; + } + + public static FilterMvPredicateEvaluator forPredicate(String predicate, DataType dataType, + @Nullable Dictionary dictionary) { + if (StringUtils.isBlank(predicate)) { + throw new IllegalArgumentException("filterMv predicate must be a non-empty string"); + } + FilterContext filterContext = parseFilterContext(predicate); + validateFilter(filterContext); + EvalNode root = buildNode(filterContext, dictionary, dataType); + return new FilterMvPredicateEvaluator(root); + } + + private static FilterContext parseFilterContext(String predicate) { + Expression expression = CalciteSqlParser.compileToExpression(predicate); + ExpressionContext expressionContext = RequestContextUtils.getExpression(expression); + return RequestContextUtils.getFilter(expressionContext); + } + + private static void validateFilter(FilterContext filterContext) { + switch (filterContext.getType()) { + case CONSTANT: + return; + case PREDICATE: + Predicate predicate = filterContext.getPredicate(); + ExpressionContext lhs = predicate.getLhs(); + if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) { + throw new IllegalArgumentException( + "filterMv only supports predicates on placeholder '" + VALUE_PLACEHOLDER + "' without functions"); + } + if (!StringUtils.equalsIgnoreCase(lhs.getIdentifier(), VALUE_PLACEHOLDER)) { + throw new IllegalArgumentException( + "filterMv predicate must reference placeholder '" + VALUE_PLACEHOLDER + "', but found '" + + lhs.getIdentifier() + "'"); + } + return; + case AND: + case OR: + for (FilterContext child : filterContext.getChildren()) { + validateFilter(child); + } + return; + case NOT: + validateFilter(filterContext.getChildren().get(0)); + return; + default: + throw new IllegalStateException("Unsupported filter type: " + filterContext.getType()); Review Comment: `validateFilter()` throws `IllegalStateException` for unsupported filter types. Because this is triggered by user input, it should be treated as a bad query (e.g., `IllegalArgumentException` or `BadQueryRequestException`) so it gets reported as a validation error rather than an internal error. ```suggestion throw new IllegalArgumentException("Unsupported filter type: " + filterContext.getType()); ``` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/FilterMvPredicateEvaluator.java: ########## @@ -0,0 +1,450 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.transform.function; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.RequestContextUtils; +import org.apache.pinot.common.request.context.predicate.Predicate; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.sql.parsers.CalciteSqlParser; + + +public final class FilterMvPredicateEvaluator { + private static final String VALUE_PLACEHOLDER = "v"; + private static final EnumSet<Predicate.Type> SUPPORTED_PREDICATE_TYPES = + EnumSet.of(Predicate.Type.EQ, Predicate.Type.NOT_EQ, Predicate.Type.IN, Predicate.Type.NOT_IN, + Predicate.Type.RANGE, Predicate.Type.REGEXP_LIKE); + + private final EvalNode _root; + + private FilterMvPredicateEvaluator(EvalNode root) { + _root = root; + } + + public static FilterMvPredicateEvaluator forPredicate(String predicate, DataType dataType, + @Nullable Dictionary dictionary) { + if (StringUtils.isBlank(predicate)) { + throw new IllegalArgumentException("filterMv predicate must be a non-empty string"); + } + FilterContext filterContext = parseFilterContext(predicate); + validateFilter(filterContext); + EvalNode root = buildNode(filterContext, dictionary, dataType); + return new FilterMvPredicateEvaluator(root); + } + + private static FilterContext parseFilterContext(String predicate) { + Expression expression = CalciteSqlParser.compileToExpression(predicate); + ExpressionContext expressionContext = RequestContextUtils.getExpression(expression); + return RequestContextUtils.getFilter(expressionContext); + } + + private static void validateFilter(FilterContext filterContext) { + switch (filterContext.getType()) { + case CONSTANT: + return; + case PREDICATE: + Predicate predicate = filterContext.getPredicate(); + ExpressionContext lhs = predicate.getLhs(); + if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) { + throw new IllegalArgumentException( + "filterMv only supports predicates on placeholder '" + VALUE_PLACEHOLDER + "' without functions"); + } + if (!StringUtils.equalsIgnoreCase(lhs.getIdentifier(), VALUE_PLACEHOLDER)) { + throw new IllegalArgumentException( + "filterMv predicate must reference placeholder '" + VALUE_PLACEHOLDER + "', but found '" + + lhs.getIdentifier() + "'"); + } + return; + case AND: + case OR: + for (FilterContext child : filterContext.getChildren()) { + validateFilter(child); + } + return; + case NOT: + validateFilter(filterContext.getChildren().get(0)); + return; + default: + throw new IllegalStateException("Unsupported filter type: " + filterContext.getType()); + } + } + + private static EvalNode buildNode(FilterContext filterContext, @Nullable Dictionary dictionary, DataType dataType) { + switch (filterContext.getType()) { + case CONSTANT: + return EvalNode.constant(filterContext.isConstantTrue()); + case PREDICATE: + Predicate predicate = filterContext.getPredicate(); + if (!SUPPORTED_PREDICATE_TYPES.contains(predicate.getType())) { + throw new IllegalArgumentException( + "filterMv does not support predicate type: " + predicate.getType()); + } + PredicateEvaluator evaluator = + PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dictionary, dataType); + if (evaluator.isAlwaysTrue()) { + return EvalNode.constant(true); + } + if (evaluator.isAlwaysFalse()) { + return EvalNode.constant(false); + } + return EvalNode.predicate(evaluator); + case AND: { + List<EvalNode> children = new ArrayList<>(); + for (FilterContext child : filterContext.getChildren()) { + EvalNode node = buildNode(child, dictionary, dataType); + if (node.isConstantFalse()) { + return EvalNode.constant(false); + } + if (!node.isConstantTrue()) { + children.add(node); + } + } + if (children.isEmpty()) { + return EvalNode.constant(true); + } + if (children.size() == 1) { + return children.get(0); + } + return EvalNode.and(children); + } + case OR: { + List<EvalNode> children = new ArrayList<>(); + for (FilterContext child : filterContext.getChildren()) { + EvalNode node = buildNode(child, dictionary, dataType); + if (node.isConstantTrue()) { + return EvalNode.constant(true); + } + if (!node.isConstantFalse()) { + children.add(node); + } + } + if (children.isEmpty()) { + return EvalNode.constant(false); + } + if (children.size() == 1) { + return children.get(0); + } + return EvalNode.or(children); + } + case NOT: { + EvalNode child = buildNode(filterContext.getChildren().get(0), dictionary, dataType); + if (child.isConstant()) { + return EvalNode.constant(!child.getConstantValue()); + } + return EvalNode.not(child); + } + default: + throw new IllegalStateException("Unsupported filter type: " + filterContext.getType()); Review Comment: `buildNode()` uses `IllegalStateException` for unexpected/unsupported filter node types. Since this can be reached from malformed predicate strings, prefer a query-validation exception (`BadQueryRequestException` / `IllegalArgumentException`) to avoid reporting it as an internal error. ```suggestion throw new IllegalArgumentException("Unsupported filter type: " + filterContext.getType()); ``` ########## pinot-core/src/main/java/org/apache/pinot/core/function/scalar/FilterMvScalarFunction.java: ########## @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.function.scalar; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import java.util.EnumMap; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.FunctionInfo; +import org.apache.pinot.common.function.PinotScalarFunction; +import org.apache.pinot.common.function.sql.PinotSqlFunction; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.operator.transform.function.FilterMvPredicateEvaluator; +import org.apache.pinot.spi.annotations.ScalarFunction; +import org.apache.pinot.spi.data.FieldSpec.DataType; + + +/** + * Scalar wrapper for filterMv so FunctionRegistry can expose type signatures for query planning and execution paths + * that resolve scalar functions. + */ +@ScalarFunction(names = {"filterMv"}) +public class FilterMvScalarFunction implements PinotScalarFunction { + private static final int MAX_CACHED_EVALUATORS = 10_000; + private static final Map<ColumnDataType, FunctionInfo> TYPE_FUNCTION_INFO_MAP = + new EnumMap<>(ColumnDataType.class); + private static final Cache<CacheKey, FilterMvPredicateEvaluator> EVALUATOR_CACHE = + CacheBuilder.newBuilder().maximumSize(MAX_CACHED_EVALUATORS).build(); + + static { + try { + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.INT_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", int[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.LONG_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", long[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.FLOAT_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", float[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.DOUBLE_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", double[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.STRING_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", String[].class, String.class), + FilterMvScalarFunction.class, false)); + TYPE_FUNCTION_INFO_MAP.put(ColumnDataType.BYTES_ARRAY, + new FunctionInfo(FilterMvScalarFunction.class.getMethod("filterMv", byte[][].class, String.class), + FilterMvScalarFunction.class, false)); Review Comment: `filterMv` adds explicit `BYTES_ARRAY` support (both signature registration and implementation), but the new/updated tests only cover INT/STRING (unit) and LONG/STRING (integration). Please add at least one test that exercises BYTES MV filtering (including a negative and positive match) to prevent regressions in this newly introduced type support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
