This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d5014786db Add first implementation of clpMatch that doesn't
explicitly use indexes. (#12291)
d5014786db is described below
commit d5014786dbd364e65a0fbd9596c4e59830de1bf9
Author: kirkrodrigues <[email protected]>
AuthorDate: Tue Jan 30 13:54:29 2024 -0500
Add first implementation of clpMatch that doesn't explicitly use indexes.
(#12291)
* Rename CLPDecodeRewriter to ClpRewriter in preparation for adding
CLP_MATCH function.
* Add version of clpMatch that doesn't rely on indexing.
* Rename CLP transform functions test file.
* Add unit tests for clpEncodedVarsMatch.
* Minor logging improvements for clpEncodedVarsMatch.
* Don't try to flatten filter expression for expression without filters.
* Add clpencodedvarsmatch to list of functions without a scalar counterpart.
---
LICENSE-binary | 2 +-
pinot-common/pom.xml | 4 +
.../common/function/TransformFunctionType.java | 3 +
.../sql/parsers/rewriter/CLPDecodeRewriter.java | 177 ------
.../pinot/sql/parsers/rewriter/ClpRewriter.java | 633 +++++++++++++++++++++
.../function/FunctionDefinitionRegistryTest.java | 6 +-
.../parsers/rewriter/CLPDecodeRewriterTest.java | 65 ---
.../sql/parsers/rewriter/ClpRewriterTest.java | 282 +++++++++
.../ClpEncodedVarsMatchTransformFunction.java | 150 +++++
.../function/TransformFunctionFactory.java | 1 +
...ionTest.java => ClpTransformFunctionsTest.java} | 111 +++-
.../inputformat/clplog/CLPLogRecordExtractor.java | 8 +-
.../clplog/CLPLogRecordExtractorTest.java | 14 +-
13 files changed, 1188 insertions(+), 268 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 0a3dac129e..7c3ce80093 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -280,7 +280,7 @@ com.uber:h3:4.1.1
com.yahoo.datasketches:memory:0.8.3
com.yahoo.datasketches:sketches-core:0.8.3
com.yammer.metrics:metrics-core:2.2.0
-com.yscope.clp:clp-ffi:0.4.3
+com.yscope.clp:clp-ffi:0.4.4
com.zaxxer:HikariCP-java7:2.4.13
commons-beanutils:commons-beanutils:1.9.4
commons-cli:commons-cli:1.5.0
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index d5b3ee6a7e..d6708e59a3 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -397,6 +397,10 @@
<groupId>com.github.seancfoley</groupId>
<artifactId>ipaddress</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.yscope.clp</groupId>
+ <artifactId>clp-ffi</artifactId>
+ </dependency>
<dependency>
<groupId>io.github.hakky54</groupId>
<artifactId>sslcontext-kickstart-for-netty</artifactId>
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index 38fb21e89a..7753260192 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -207,6 +207,9 @@ public enum TransformFunctionType {
CLP_DECODE("clpDecode", ReturnTypes.VARCHAR_2000_NULLABLE,
OperandTypes.family(
ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.ANY,
SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER),
ordinal -> ordinal > 2), "clp_decode"),
+ CLP_ENCODED_VARS_MATCH("clpEncodedVarsMatch", ReturnTypes.BOOLEAN_NOT_NULL,
OperandTypes.family(
+ ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.ANY,
SqlTypeFamily.CHARACTER),
+ ordinal -> ordinal > 2), "clp_encoded_vars_match"),
// Regexp functions
REGEXP_EXTRACT("regexpExtract", "regexp_extract"),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriter.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriter.java
deleted file mode 100644
index aff7266153..0000000000
---
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriter.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.sql.parsers.rewriter;
-
-import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.pinot.common.function.TransformFunctionType;
-import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.request.ExpressionType;
-import org.apache.pinot.common.request.Function;
-import org.apache.pinot.common.request.Identifier;
-import org.apache.pinot.common.request.Literal;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.sql.parsers.SqlCompilationException;
-
-
-/**
- * Query rewriter to rewrite clpDecode so that users can pass in the name of a
CLP-encoded column group instead of the
- * names of all the columns in the group.
- * <p>
- * Usage:
- * <pre>
- * clpDecode("columnGroupName"[, defaultValue])
- * </pre>
- * which will be rewritten to:
- * <pre>
- * clpDecode("columnGroupName_logtype", "columnGroupName_dictionaryVars",
"columnGroupName_encodedVars"[,
- * defaultValue])
- * </pre>
- * The "defaultValue" is optional. See
- * {@link
org.apache.pinot.core.operator.transform.function.CLPDecodeTransformFunction}
for its description.
- * <p>
- * Sample queries:
- * <pre>
- * SELECT clpDecode("message") FROM table
- * SELECT clpDecode("message", 'null') FROM table
- * </pre>
- * See {@link
org.apache.pinot.core.operator.transform.function.CLPDecodeTransformFunction}
for details about the
- * underlying clpDecode transformer.
- */
-public class CLPDecodeRewriter implements QueryRewriter {
- public static final String LOGTYPE_COLUMN_SUFFIX = "_logtype";
- public static final String DICTIONARY_VARS_COLUMN_SUFFIX = "_dictionaryVars";
- public static final String ENCODED_VARS_COLUMN_SUFFIX = "_encodedVars";
-
- private static final String _CLPDECODE_LOWERCASE_TRANSFORM_NAME =
- TransformFunctionType.CLP_DECODE.getName().toLowerCase();
-
- @Override
- public PinotQuery rewrite(PinotQuery pinotQuery) {
- List<Expression> selectExpressions = pinotQuery.getSelectList();
- if (null != selectExpressions) {
- for (Expression e : selectExpressions) {
- tryRewritingExpression(e);
- }
- }
- List<Expression> groupByExpressions = pinotQuery.getGroupByList();
- if (null != groupByExpressions) {
- for (Expression e : groupByExpressions) {
- tryRewritingExpression(e);
- }
- }
- List<Expression> orderByExpressions = pinotQuery.getOrderByList();
- if (null != orderByExpressions) {
- for (Expression e : orderByExpressions) {
- tryRewritingExpression(e);
- }
- }
- tryRewritingExpression(pinotQuery.getFilterExpression());
- tryRewritingExpression(pinotQuery.getHavingExpression());
- return pinotQuery;
- }
-
- /**
- * Rewrites any instances of clpDecode in the given expression
- * @param expression Expression which may contain instances of clpDecode
- */
- private void tryRewritingExpression(Expression expression) {
- if (null == expression) {
- return;
- }
- Function function = expression.getFunctionCall();
- if (null == function) {
- return;
- }
-
- String functionName = function.getOperator();
- if (functionName.equals(_CLPDECODE_LOWERCASE_TRANSFORM_NAME)) {
- rewriteCLPDecodeFunction(expression);
- return;
- }
-
- // Function isn't a CLP function that needs rewriting, but the arguments
might be, so we recursively process them.
- for (Expression op : function.getOperands()) {
- tryRewritingExpression(op);
- }
- }
-
- /**
- * Rewrites the given instance of clpDecode as described in the class'
Javadoc
- * @param expression clpDecode function expression
- */
- private void rewriteCLPDecodeFunction(Expression expression) {
- Function function = expression.getFunctionCall();
- List<Expression> arguments = function.getOperands();
-
- // Validate clpDecode's arguments
- int numArgs = arguments.size();
- if (numArgs < 1 || numArgs > 2) {
- // Too few/many args for this rewriter, so do nothing and let it pass
through to the clpDecode transform function
- return;
- }
-
- Expression arg0 = arguments.get(0);
- if (ExpressionType.IDENTIFIER != arg0.getType()) {
- throw new SqlCompilationException("clpDecode: 1st argument must be a
column group name (identifier).");
- }
- String columnGroupName = arg0.getIdentifier().getName();
-
- Literal defaultValueLiteral = null;
- if (numArgs > 1) {
- Expression arg1 = arguments.get(1);
- if (ExpressionType.LITERAL != arg1.getType()) {
- throw new SqlCompilationException("clpDecode: 2nd argument must be a
default value (literal).");
- }
- defaultValueLiteral = arg1.getLiteral();
- }
-
- // Replace the columnGroup with the individual columns
- arguments.clear();
- addCLPDecodeOperands(columnGroupName, defaultValueLiteral, function);
- }
-
- /**
- * Adds the CLPDecode transform function's operands to the given function
- * @param columnGroupName Name of the CLP-encoded column group
- * @param defaultValueLiteral Optional default value to pass through to the
transform function
- * @param clpDecode The function to add the operands to
- */
- private void addCLPDecodeOperands(String columnGroupName, @Nullable Literal
defaultValueLiteral, Function clpDecode) {
- Expression e;
-
- e = new Expression(ExpressionType.IDENTIFIER);
- e.setIdentifier(new Identifier(columnGroupName + LOGTYPE_COLUMN_SUFFIX));
- clpDecode.addToOperands(e);
-
- e = new Expression(ExpressionType.IDENTIFIER);
- e.setIdentifier(new Identifier(columnGroupName +
DICTIONARY_VARS_COLUMN_SUFFIX));
- clpDecode.addToOperands(e);
-
- e = new Expression(ExpressionType.IDENTIFIER);
- e.setIdentifier(new Identifier(columnGroupName +
ENCODED_VARS_COLUMN_SUFFIX));
- clpDecode.addToOperands(e);
-
- if (null != defaultValueLiteral) {
- e = new Expression(ExpressionType.LITERAL);
- e.setLiteral(defaultValueLiteral);
- clpDecode.addToOperands(e);
- }
- }
-}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriter.java
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriter.java
new file mode 100644
index 0000000000..4fdddac57d
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriter.java
@@ -0,0 +1,633 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import
com.yscope.clp.compressorfrontend.AbstractClpEncodedSubquery.VariableWildcardQuery;
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.ByteSegment;
+import com.yscope.clp.compressorfrontend.EightByteClpEncodedSubquery;
+import com.yscope.clp.compressorfrontend.EightByteClpWildcardQueryEncoder;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.Literal;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.context.predicate.Predicate;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+
+
+/**
+ * Query rewriter to rewrite CLP-related functions: clpDecode and clpMatch.
+ * <ul>
+ * <li>clpDecode rewrites the query so users can pass in the name of a
CLP-encoded column group instead of the names
+ * of all the columns in the group.</li>
+ * <li>clpMatch rewrites a wildcard query into a SQL boolean expression that
implement's CLP's query processing
+ * logic.</li>
+ * </ul>
+ * <p>
+ * clpDecode usage:
+ * <pre>
+ * clpDecode("columnGroupName"[, defaultValue])
+ * </pre>
+ * will be rewritten to:
+ * <pre>
+ * clpDecode("columnGroupName_logtype", "columnGroupName_dictionaryVars",
"columnGroupName_encodedVars"[,
+ * defaultValue])
+ * </pre>
+ * The "defaultValue" is optional. See
+ * {@link
org.apache.pinot.core.operator.transform.function.CLPDecodeTransformFunction}
for its description.
+ * <p>
+ * Sample queries:
+ * <pre>
+ * SELECT clpDecode("message") FROM table
+ * SELECT clpDecode("message", 'null') FROM table
+ * </pre>
+ * See {@link
org.apache.pinot.core.operator.transform.function.CLPDecodeTransformFunction}
for details about the
+ * underlying clpDecode transformer.
+ * <p>
+ * clpMatch usage:
+ * <pre>
+ * clpMatch("columnGroupName", 'wildcardQuery')
+ * </pre>
+ * OR
+ * <pre>
+ * clpMatch("columnGroupName_logtype", "columnGroupName_dictionaryVars",
"columnGroupName_encodedVars",
+ * 'wildcardQuery')
+ * </pre>
+ * <p>
+ * Sample queries:
+ * <pre>
+ * SELECT * FROM table WHERE clpMatch(message, '* job1 failed *')
+ * SELECT * FROM table WHERE clpMatch(message_logtype,
message_dictionaryVars, message_encodedVars, '* job1 failed *')
+ * </pre>
+ */
+public class ClpRewriter implements QueryRewriter {
+ public static final String LOGTYPE_COLUMN_SUFFIX = "_logtype";
+ public static final String DICTIONARY_VARS_COLUMN_SUFFIX = "_dictionaryVars";
+ public static final String ENCODED_VARS_COLUMN_SUFFIX = "_encodedVars";
+
+ private static final String _CLPDECODE_LOWERCASE_TRANSFORM_NAME =
+ TransformFunctionType.CLP_DECODE.getName().toLowerCase();
+ private static final String _REGEXP_LIKE_LOWERCASE_FUNCTION_NAME =
Predicate.Type.REGEXP_LIKE.name();
+ private static final char[] _NON_WILDCARD_REGEX_META_CHARACTERS =
+ {'^', '$', '.', '{', '}', '[', ']', '(', ')', '+', '|', '<', '>', '-',
'/', '=', '!'};
+ private static final String _CLPMATCH_LOWERCASE_FUNCTION_NAME = "clpmatch";
+
+ @Override
+ public PinotQuery rewrite(PinotQuery pinotQuery) {
+ List<Expression> selectExpressions = pinotQuery.getSelectList();
+ if (null != selectExpressions) {
+ for (Expression e : selectExpressions) {
+ tryRewritingExpression(e, false);
+ }
+ }
+ List<Expression> groupByExpressions = pinotQuery.getGroupByList();
+ if (null != groupByExpressions) {
+ for (Expression e : groupByExpressions) {
+ tryRewritingExpression(e, false);
+ }
+ }
+ List<Expression> orderByExpressions = pinotQuery.getOrderByList();
+ if (null != orderByExpressions) {
+ for (Expression e : orderByExpressions) {
+ tryRewritingExpression(e, false);
+ }
+ }
+ tryRewritingExpression(pinotQuery.getFilterExpression(), true);
+ tryRewritingExpression(pinotQuery.getHavingExpression(), true);
+ return pinotQuery;
+ }
+
+ /**
+ * Rewrites any instances of clpDecode or clpMatch in the given expression
+ * @param expression Expression which may contain instances of clpDecode
+ * @param isFilterExpression Whether the root-level expression (not an
expression from a recursive step) is a
+ * filter expression.
+ */
+ private void tryRewritingExpression(Expression expression, boolean
isFilterExpression) {
+ if (null == expression) {
+ return;
+ }
+ Function function = expression.getFunctionCall();
+ if (null == function) {
+ return;
+ }
+
+ String functionName = function.getOperator();
+ if (functionName.equals(_CLPDECODE_LOWERCASE_TRANSFORM_NAME)) {
+ rewriteCLPDecodeFunction(expression);
+ return;
+ }
+
+ if (functionName.equals(_CLPMATCH_LOWERCASE_FUNCTION_NAME)) {
+ if (!isFilterExpression) {
+ throw new SqlCompilationException(
+ _CLPMATCH_LOWERCASE_FUNCTION_NAME + " cannot be used outside
filter expressions.");
+ }
+ rewriteClpMatchFunction(expression);
+ return;
+ }
+
+ // Work around https://github.com/apache/pinot/issues/10478
+ if (isClpMatchEqualsFunctionCall(function)) {
+ replaceClpMatchEquals(expression, function);
+ return;
+ }
+
+ if (isInvertedClpMatchEqualsFunctionCall(function)) {
+ // Replace `NOT clpMatch(...) = true` with the boolean expression
equivalent to `NOT clpMatch(...)`
+ List<Expression> operands = function.getOperands();
+ Expression op0 = operands.get(0);
+ Function f = op0.getFunctionCall();
+ replaceClpMatchEquals(op0, f);
+ return;
+ }
+
+ if (functionName.equals(SqlKind.FILTER.lowerName)) {
+ isFilterExpression = true;
+ }
+
+ // Function isn't a CLP function that needs rewriting, but the arguments
might be, so we recursively process them.
+ for (Expression op : function.getOperands()) {
+ tryRewritingExpression(op, isFilterExpression);
+ }
+ }
+
+ /**
+ * @param function
+ * @return Whether the function call is `NOT clpMatch(...) = true`
+ */
+ private boolean isInvertedClpMatchEqualsFunctionCall(Function function) {
+ // Validate this is a "NOT" function call
+ if (!function.getOperator().equals(SqlKind.NOT.name())) {
+ return false;
+ }
+
+ // Validate the first operand is a function call
+ List<Expression> operands = function.getOperands();
+ Expression op0 = operands.get(0);
+ if (!op0.getType().equals(ExpressionType.FUNCTION)) {
+ return false;
+ }
+
+ return isClpMatchEqualsFunctionCall(op0.getFunctionCall());
+ }
+
+ /**
+ * @param function
+ * @return Whether the function call is `clpMatch(...) = true`
+ */
+ private boolean isClpMatchEqualsFunctionCall(Function function) {
+ // Validate this is an equals function call
+ if (!function.getOperator().equals(SqlKind.EQUALS.name())) {
+ return false;
+ }
+
+ // Validate operands are a function and a literal
+ List<Expression> operands = function.getOperands();
+ Expression op0 = operands.get(0);
+ Expression op1 = operands.get(1);
+ if (!op0.getType().equals(ExpressionType.FUNCTION) ||
!op1.getType().equals(ExpressionType.LITERAL)) {
+ return false;
+ }
+
+ // Validate the left operand is clpMatch and the right is true
+ Function f = op0.getFunctionCall();
+ Literal l = op1.getLiteral();
+ return f.getOperator().equals(_CLPMATCH_LOWERCASE_FUNCTION_NAME) &&
l.isSetBoolValue() && l.getBoolValue();
+ }
+
+ /**
+ * Replaces `clpMatch(...) = true` with the boolean expression equivalent to
`clpMatch(...)`
+ * @param expression
+ * @param function
+ */
+ private void replaceClpMatchEquals(Expression expression, Function function)
{
+ // Replace clpMatch with the equivalent boolean expression and then replace
+ // `clpMatch(...) = true` with this boolean expression
+ List<Expression> operands = function.getOperands();
+ Expression op0 = operands.get(0);
+ rewriteClpMatchFunction(op0);
+ expression.setFunctionCall(op0.getFunctionCall());
+ }
+
+ /**
+ * Rewrites `clpMatch(...)` using CLP's query translation logic
+ * @param expression
+ */
+ private void rewriteClpMatchFunction(Expression expression) {
+ Function currentFunction = expression.getFunctionCall();
+ List<Expression> operands = currentFunction.getOperands();
+
+ if (operands.size() == 2) {
+ // Handle clpMatch("<columnGroupName>", '<query>')
+
+ Expression op0 = operands.get(0);
+ if (ExpressionType.IDENTIFIER != op0.getType()) {
+ throw new SqlCompilationException("clpMatch: 1st operand must be an
identifier.");
+ }
+ String columnGroupName = op0.getIdentifier().getName();
+
+ Expression op1 = operands.get(1);
+ if (ExpressionType.LITERAL != op1.getType()) {
+ throw new SqlCompilationException("clpMatch: 2nd operand must be a
literal.");
+ }
+ String wildcardQuery = op1.getLiteral().getStringValue();
+
+ rewriteClpMatchFunction(expression, columnGroupName +
LOGTYPE_COLUMN_SUFFIX,
+ columnGroupName + DICTIONARY_VARS_COLUMN_SUFFIX, columnGroupName +
ENCODED_VARS_COLUMN_SUFFIX, wildcardQuery);
+ } else if (operands.size() == 4) {
+ // Handle clpMatch("<columnGroupName>_logtype",
"<columnGroupName>_dictionaryVars",
+ // "<columnGroupName>_encodedVars", '<query>')
+
+ for (int i = 0; i < 3; i++) {
+ Expression op = operands.get(i);
+ if (ExpressionType.IDENTIFIER != op.getType()) {
+ throw new SqlCompilationException("clpMatch: First three operands
must be an identifiers.");
+ }
+ }
+ int i = 0;
+ String logtypeColumnName = operands.get(i++).getIdentifier().getName();
+ String dictVarsColumnName = operands.get(i++).getIdentifier().getName();
+ String encodedVarsColumnName =
operands.get(i++).getIdentifier().getName();
+
+ Expression op3 = operands.get(i);
+ if (ExpressionType.LITERAL != op3.getType()) {
+ throw new SqlCompilationException("clpMatch: 4th operand must be a
literal.");
+ }
+ String wildcardQuery = op3.getLiteral().getStringValue();
+
+ rewriteClpMatchFunction(expression, logtypeColumnName,
dictVarsColumnName, encodedVarsColumnName, wildcardQuery);
+ } else {
+ // Wrong number of args
+ throw new SqlCompilationException("clpMatch: Too few/many operands -
only 2 or 4 operands are expected.");
+ }
+ }
+
+ /**
+ * Rewrites `clpMatch(...)` using CLP's query translation logic
+ * @param expression
+ * @param logtypeColumnName
+ * @param dictionaryVarsColumnName
+ * @param encodedVarsColumnName
+ * @param wildcardQuery
+ */
+ private void rewriteClpMatchFunction(Expression expression, String
logtypeColumnName, String dictionaryVarsColumnName,
+ String encodedVarsColumnName, String wildcardQuery) {
+ if (wildcardQuery.isEmpty()) {
+ // Return `columnGroupName_logtype = ''`
+ Function f = new Function(SqlKind.EQUALS.name());
+ f.addToOperands(RequestUtils.getIdentifierExpression(logtypeColumnName));
+ f.addToOperands(RequestUtils.getLiteralExpression(""));
+ expression.setFunctionCall(f);
+ return;
+ }
+
+ EightByteClpWildcardQueryEncoder queryEncoder =
+ new
EightByteClpWildcardQueryEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ EightByteClpEncodedSubquery[] subqueries =
queryEncoder.encode(wildcardQuery);
+
+ Function subqueriesFunc;
+ boolean requireDecompAndMatch = false;
+ if (1 == subqueries.length) {
+ ClpSqlSubqueryGenerationResult result =
+ convertSubqueryToSql(logtypeColumnName, dictionaryVarsColumnName,
encodedVarsColumnName, wildcardQuery, 0,
+ subqueries);
+ requireDecompAndMatch = result.requiresDecompAndMatch();
+ subqueriesFunc = result.getSqlFunc();
+ } else {
+ subqueriesFunc = new Function(SqlKind.OR.name());
+
+ for (int i = 0; i < subqueries.length; i++) {
+ ClpSqlSubqueryGenerationResult result =
+ convertSubqueryToSql(logtypeColumnName, dictionaryVarsColumnName,
encodedVarsColumnName, wildcardQuery, i,
+ subqueries);
+ if (result.requiresDecompAndMatch()) {
+ requireDecompAndMatch = true;
+ }
+ Function f = result.getSqlFunc();
+ Expression e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueriesFunc.addToOperands(e);
+ }
+ }
+
+ Function newFunc;
+ if (!requireDecompAndMatch) {
+ newFunc = subqueriesFunc;
+ } else {
+ newFunc = new Function(SqlKind.AND.name());
+
+ Expression e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(subqueriesFunc);
+ newFunc.addToOperands(e);
+
+ Function clpDecodeCall = new
Function(_CLPDECODE_LOWERCASE_TRANSFORM_NAME);
+ addCLPDecodeOperands(logtypeColumnName, dictionaryVarsColumnName,
encodedVarsColumnName, Literal.stringValue(""),
+ clpDecodeCall);
+
+ Function clpDecodeLike = new
Function(_REGEXP_LIKE_LOWERCASE_FUNCTION_NAME);
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(clpDecodeCall);
+ clpDecodeLike.addToOperands(e);
+
+ e = new Expression(ExpressionType.LITERAL);
+ e.setLiteral(Literal.stringValue(wildcardQueryToRegex(wildcardQuery)));
+ clpDecodeLike.addToOperands(e);
+
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(clpDecodeLike);
+ newFunc.addToOperands(e);
+ }
+
+ expression.setFunctionCall(newFunc);
+ }
+
+ /**
+ * Rewrites the given instance of clpDecode as described in the class'
Javadoc
+ * @param expression clpDecode function expression
+ */
+ private void rewriteCLPDecodeFunction(Expression expression) {
+ Function function = expression.getFunctionCall();
+ List<Expression> arguments = function.getOperands();
+
+ // Validate clpDecode's arguments
+ int numArgs = arguments.size();
+ if (numArgs < 1 || numArgs > 2) {
+ // Too few/many args for this rewriter, so do nothing and let it pass
through to the clpDecode transform function
+ return;
+ }
+
+ Expression arg0 = arguments.get(0);
+ if (ExpressionType.IDENTIFIER != arg0.getType()) {
+ throw new SqlCompilationException("clpDecode: 1st argument must be a
column group name (identifier).");
+ }
+ String columnGroupName = arg0.getIdentifier().getName();
+
+ Literal defaultValueLiteral = null;
+ if (numArgs > 1) {
+ Expression arg1 = arguments.get(1);
+ if (ExpressionType.LITERAL != arg1.getType()) {
+ throw new SqlCompilationException("clpDecode: 2nd argument must be a
default value (literal).");
+ }
+ defaultValueLiteral = arg1.getLiteral();
+ }
+
+ // Replace the columnGroup with the individual columns
+ arguments.clear();
+ addCLPDecodeOperands(columnGroupName, defaultValueLiteral, function);
+ }
+
+ private ClpSqlSubqueryGenerationResult convertSubqueryToSql(String
logtypeColumnName, String dictionaryVarsColumnName,
+ String encodedVarsColumnName, String wildcardQuery, int subqueryIdx,
EightByteClpEncodedSubquery[] subqueries) {
+ EightByteClpEncodedSubquery subquery = subqueries[subqueryIdx];
+
+ if (!subquery.containsVariables()) {
+ Function f = createLogtypeMatchFunction(logtypeColumnName,
subquery.getLogtypeQueryAsString(),
+ subquery.logtypeQueryContainsWildcards());
+ return new ClpSqlSubqueryGenerationResult(false, f);
+ }
+
+ Function subqueryFunc = new Function(SqlKind.AND.name());
+
+ Expression e;
+
+ // Add logtype query
+ Function f = createLogtypeMatchFunction(logtypeColumnName,
subquery.getLogtypeQueryAsString(),
+ subquery.logtypeQueryContainsWildcards());
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ // Add any dictionary variables
+ int numDictVars = 0;
+ for (ByteSegment dictVar : subquery.getDictVars()) {
+ f = createStringColumnMatchFunction(SqlKind.EQUALS.name(),
dictionaryVarsColumnName, dictVar.toString());
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ ++numDictVars;
+ }
+
+ // Add any encoded variables
+ int numEncodedVars = 0;
+ for (long encodedVar : subquery.getEncodedVars()) {
+ f = new Function(SqlKind.EQUALS.name());
+
f.addToOperands(RequestUtils.getIdentifierExpression(encodedVarsColumnName));
+ f.addToOperands(RequestUtils.getLiteralExpression(encodedVar));
+
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ ++numEncodedVars;
+ }
+
+ // Add any wildcard dictionary variables
+ for (VariableWildcardQuery varWildcardQuery :
subquery.getDictVarWildcardQueries()) {
+ f =
createStringColumnMatchFunction(_REGEXP_LIKE_LOWERCASE_FUNCTION_NAME,
dictionaryVarsColumnName,
+ wildcardQueryToRegex(varWildcardQuery.getQuery().toString()));
+ e = new Expression(ExpressionType.FUNCTION);
+ e.setFunctionCall(f);
+ subqueryFunc.addToOperands(e);
+
+ ++numDictVars;
+ }
+
+ // Add any wildcard encoded variables
+ int numEncodedVarWildcardQueries =
subquery.getNumEncodedVarWildcardQueries();
+ numEncodedVars += numEncodedVarWildcardQueries;
+ if (numEncodedVarWildcardQueries > 0) {
+ // Create call to clpEncodedVarsMatch
+ Expression clpEncodedVarsExp = RequestUtils.getFunctionExpression(
+ RequestUtils.canonicalizeFunctionNamePreservingSpecialKey(
+ TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName()));
+ f = clpEncodedVarsExp.getFunctionCall();
+ f.addToOperands(RequestUtils.getIdentifierExpression(logtypeColumnName));
+
f.addToOperands(RequestUtils.getIdentifierExpression(encodedVarsColumnName));
+ f.addToOperands(RequestUtils.getLiteralExpression(wildcardQuery));
+ f.addToOperands(RequestUtils.getLiteralExpression(subqueryIdx));
+
+ // Create `clpEncodedVarsMatch(...) = true`
+ e = RequestUtils.getFunctionExpression(SqlKind.EQUALS.name());
+ f = e.getFunctionCall();
+ f.addToOperands(clpEncodedVarsExp);
+ f.addToOperands(RequestUtils.getLiteralExpression(true));
+
+ subqueryFunc.addToOperands(e);
+ }
+
+ // We require a decompress and match in the following cases:
+ // 1. There are >1 variables of a specific type (dict/encoded) in the
query. Consider this query: " dv123 dv456 ".
+ // The corresponding SQL will look like:
+ // "x_logtype = '...' AND x_dictionaryVars = 'dv123' AND
x_dictionaryVars = 'dv456'"
+ // This SQL will indeed match values which also match the query; but
this SQL will also match values which
+ // *don't* match the query, like " dv456 dv123 ". This is because the
SQL query doesn't encode the position of
+ // the variables.
+ // 2. There is no more than 1 variable of each type, but the logtype query
contains wildcards. Consider this query:
+ // "user dv123 *". The corresponding SQL will look like:
+ // "REGEXP_LIKE(x_logtype, "user: \dv .*") AND x_dictionaryVars =
'dv123'",
+ // where "\dv" is a dictionary variable placeholder. This SQL could
match the
+ // value "user dv123 joined" but it could also match "user dv456 joined
dv123".
+ boolean requiresDecompAndMatch =
+ !(numDictVars < 2 && numEncodedVars < 2 &&
!subquery.logtypeQueryContainsWildcards());
+ return new ClpSqlSubqueryGenerationResult(requiresDecompAndMatch,
subqueryFunc);
+ }
+
+ private Function createLogtypeMatchFunction(String columnName, String query,
boolean containsWildcards) {
+ String funcName;
+ String funcQuery;
+ if (containsWildcards) {
+ funcName = _REGEXP_LIKE_LOWERCASE_FUNCTION_NAME;
+ funcQuery = wildcardQueryToRegex(query);
+ } else {
+ funcName = SqlKind.EQUALS.name();
+ funcQuery = query;
+ }
+ return createStringColumnMatchFunction(funcName, columnName, funcQuery);
+ }
+
+ private Function createStringColumnMatchFunction(String canonicalName,
String columnName, String query) {
+ Function func = new Function(canonicalName);
+ func.addToOperands(RequestUtils.getIdentifierExpression(columnName));
+ func.addToOperands(RequestUtils.getLiteralExpression(query));
+ return func;
+ }
+
+ /**
+ * Converts a CLP-encoded column group into physical column names and adds
them to the CLPDecode transform function's
+ * operands.
+ * @param columnGroupName Name of the CLP-encoded column group
+ * @param defaultValueLiteral Optional default value to pass through to the
transform function
+ * @param clpDecode The function to add the operands to
+ */
+ private void addCLPDecodeOperands(String columnGroupName, @Nullable Literal
defaultValueLiteral, Function clpDecode) {
+ addCLPDecodeOperands(columnGroupName + LOGTYPE_COLUMN_SUFFIX,
columnGroupName + DICTIONARY_VARS_COLUMN_SUFFIX,
+ columnGroupName + ENCODED_VARS_COLUMN_SUFFIX, defaultValueLiteral,
clpDecode);
+ }
+
+ /**
+ * Adds the given operands to the given CLPDecode transform function.
+ * @param logtypeColumnName
+ * @param dictionaryVarsColumnName
+ * @param encodedVarsColumnName
+ * @param defaultValueLiteral
+ * @param clpDecode
+ */
+ private void addCLPDecodeOperands(String logtypeColumnName, String
dictionaryVarsColumnName,
+ String encodedVarsColumnName, @Nullable Literal defaultValueLiteral,
Function clpDecode) {
+
clpDecode.addToOperands(RequestUtils.getIdentifierExpression(logtypeColumnName));
+
clpDecode.addToOperands(RequestUtils.getIdentifierExpression(dictionaryVarsColumnName));
+
clpDecode.addToOperands(RequestUtils.getIdentifierExpression(encodedVarsColumnName));
+ if (null != defaultValueLiteral) {
+ Expression e = new Expression(ExpressionType.LITERAL);
+ e.setLiteral(defaultValueLiteral);
+ clpDecode.addToOperands(e);
+ }
+ }
+
+ /**
+ * Converts a wildcard query into a regular expression. The wildcard query
is a string which may contain two possible
+ * wildcards:
+ * 1. '*' that matches zero or more characters.
+ * 2. '?' that matches any single character.
+ * @param wildcardQuery
+ * @return The regular expression which matches the same values as the
wildcard query.
+ */
+ private static String wildcardQueryToRegex(String wildcardQuery) {
+ boolean isEscaped = false;
+ StringBuilder queryWithSqlWildcards = new StringBuilder();
+
+ // Add begin anchor if necessary
+ if (!wildcardQuery.isEmpty() && '*' != wildcardQuery.charAt(0)) {
+ queryWithSqlWildcards.append('^');
+ }
+
+ int uncopiedIdx = 0;
+ for (int queryIdx = 0; queryIdx < wildcardQuery.length(); queryIdx++) {
+ char queryChar = wildcardQuery.charAt(queryIdx);
+ if (isEscaped) {
+ isEscaped = false;
+ } else {
+ if ('\\' == queryChar) {
+ isEscaped = true;
+ } else if (isWildcard(queryChar)) {
+ queryWithSqlWildcards.append(wildcardQuery, uncopiedIdx, queryIdx);
+ queryWithSqlWildcards.append('.');
+ uncopiedIdx = queryIdx;
+ } else {
+ for (final char metaChar : _NON_WILDCARD_REGEX_META_CHARACTERS) {
+ if (metaChar == queryChar) {
+ queryWithSqlWildcards.append(wildcardQuery, uncopiedIdx,
queryIdx);
+ queryWithSqlWildcards.append('\\');
+ uncopiedIdx = queryIdx;
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (uncopiedIdx < wildcardQuery.length()) {
+ queryWithSqlWildcards.append(wildcardQuery, uncopiedIdx,
wildcardQuery.length());
+ }
+
+ // Add end anchor if necessary
+ if (!wildcardQuery.isEmpty() && '*' !=
wildcardQuery.charAt(wildcardQuery.length() - 1)) {
+ queryWithSqlWildcards.append('$');
+ }
+
+ return queryWithSqlWildcards.toString();
+ }
+
+ /**
+ * @param c
+ * @return Whether the given character is a wildcard.
+ */
+ private static boolean isWildcard(char c) {
+ return '*' == c || '?' == c;
+ }
+
+ /**
+ * Simple class to hold the result of turning a CLP subquery into SQL.
+ */
+ private static class ClpSqlSubqueryGenerationResult {
+ private final boolean _requiresDecompAndMatch;
+ private final Function _sqlFunc;
+
+ ClpSqlSubqueryGenerationResult(boolean requiresDecompAndMatch, Function
sqlFunc) {
+ _requiresDecompAndMatch = requiresDecompAndMatch;
+ _sqlFunc = sqlFunc;
+ }
+
+ public boolean requiresDecompAndMatch() {
+ return _requiresDecompAndMatch;
+ }
+
+ public Function getSqlFunc() {
+ return _sqlFunc;
+ }
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
index ba4ba42a3c..819c8b84c2 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
@@ -47,9 +47,9 @@ public class FunctionDefinitionRegistryTest {
// Scalar function
"scalar",
// Functions without scalar function counterpart as of now
- "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum",
"clpdecode", "groovy", "inidset",
- "jsonextractscalar", "jsonextractindex", "jsonextractkey", "lookup",
"mapvalue", "timeconvert", "valuein",
- "datetimeconvertwindowhop",
+ "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum",
"clpdecode", "clpencodedvarsmatch", "groovy",
+ "inidset", "jsonextractscalar", "jsonextractindex", "jsonextractkey",
"lookup", "mapvalue", "timeconvert",
+ "valuein", "datetimeconvertwindowhop",
// functions not needed for register b/c they are in std sql table or
they will not be composed directly.
"in", "not_in", "and", "or", "range", "extract", "is_true",
"is_not_true", "is_false", "is_not_false"
);
diff --git
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriterTest.java
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriterTest.java
deleted file mode 100644
index e6bdb8dff6..0000000000
---
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/CLPDecodeRewriterTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.sql.parsers.rewriter;
-
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
-import org.apache.pinot.sql.parsers.SqlCompilationException;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertThrows;
-
-
-public class CLPDecodeRewriterTest {
- private static final QueryRewriter _QUERY_REWRITER = new CLPDecodeRewriter();
-
- @Test
- public void testCLPDecodeRewrite() {
- // clpDecode rewrite from column group to individual columns
- testQueryRewrite("SELECT clpDecode(message) FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars) FROM clpTable");
- testQueryRewrite("SELECT clpDecode(message, 'null') FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars, 'null') FROM clpTable");
-
- // clpDecode passthrough
- testQueryRewrite("SELECT clpDecode(message_logtype,
message_dictionaryVars, message_encodedVars) FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars) FROM clpTable");
- testQueryRewrite(
- "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars, 'null') FROM clpTable",
- "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars, 'null') FROM clpTable");
- }
-
- @Test
- public void testUnsupportedCLPDecodeQueries() {
- testUnsupportedQuery("SELECT clpDecode('message') FROM clpTable");
- testUnsupportedQuery("SELECT clpDecode('message', 'default') FROM
clpTable");
- testUnsupportedQuery("SELECT clpDecode('message', default) FROM clpTable");
- testUnsupportedQuery("SELECT clpDecode(message, default) FROM clpTable");
- }
-
- private void testQueryRewrite(String original, String expected) {
-
assertEquals(_QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(original)),
- CalciteSqlParser.compileToPinotQuery(expected));
- }
-
- private void testUnsupportedQuery(String query) {
- assertThrows(SqlCompilationException.class,
- () ->
_QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(query)));
- }
-}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriterTest.java
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriterTest.java
new file mode 100644
index 0000000000..987f4e449b
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/rewriter/ClpRewriterTest.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.parsers.rewriter;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.fail;
+
+
+public class ClpRewriterTest {
+ private static final QueryRewriter _QUERY_REWRITER = new ClpRewriter();
+
+ @Test
+ public void testCLPDecodeRewrite() {
+ // clpDecode rewrite from column group to individual columns
+ testQueryRewrite("SELECT clpDecode(message) FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars) FROM clpTable");
+ testQueryRewrite("SELECT clpDecode(message, 'null') FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars, 'null') FROM clpTable");
+
+ // clpDecode passthrough
+ testQueryRewrite("SELECT clpDecode(message_logtype,
message_dictionaryVars, message_encodedVars) FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars) FROM clpTable");
+ testQueryRewrite(
+ "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars, 'null') FROM clpTable",
+ "SELECT clpDecode(message_logtype, message_dictionaryVars,
message_encodedVars, 'null') FROM clpTable");
+ }
+
+ @Test
+ public void testClpMatchRewrite() {
+ MessageEncoder encoder = new
MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ EncodedMessage encodedMessage = new EncodedMessage();
+ try {
+ String message;
+ String[] dictionaryVars;
+ Long[] encodedVars;
+
+ // Query with no wildcards and no variables
+ message = " INFO container ";
+ encoder.encodeMessage(message, encodedMessage);
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'",
encodedMessage.getLogTypeAsString())
+ );
+
+ // Query with no wildcards and no variables using individual column names
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE
clpMatch(message_logtype, message_dictionaryVars,"
+ + " message_encodedVars, '%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'",
encodedMessage.getLogTypeAsString())
+ );
+
+ // Query with wildcards and no variables
+ message = " INFO container ";
+ encoder.encodeMessage(message, encodedMessage);
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'*%s*')", message),
+ String.format("SELECT * FROM clpTable WHERE
REGEXP_LIKE(message_logtype, '.*%s.*')",
+ encodedMessage.getLogTypeAsString())
+ );
+
+ // Query with no wildcards and a single dictionary var
+ message = " var123 ";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'
AND message_dictionaryVars = '%s'",
+ encodedMessage.getLogTypeAsString(), dictionaryVars[0])
+ );
+
+ // Query with no wildcards and a single encoded var
+ message = " 123 ";
+ encoder.encodeMessage(message, encodedMessage);
+ encodedVars = encodedMessage.getEncodedVarsAsBoxedLongs();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'
AND message_encodedVars = %s",
+ encodedMessage.getLogTypeAsString(), encodedVars[0])
+ );
+
+ // Query with no wildcards and multiple dictionary vars
+ message = " var123 var456 ";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'
AND message_dictionaryVars = '%s'"
+ + " AND message_dictionaryVars = '%s'"
+ + " AND REGEXP_LIKE(clpdecode(message_logtype,
message_dictionaryVars, message_encodedVars, ''),"
+ + " '%s')",
+ encodedMessage.getLogTypeAsString(), dictionaryVars[0],
dictionaryVars[1], String.format("^%s$", message))
+ );
+
+ // Query with no wildcards and multiple encoded vars
+ message = " 123 456 ";
+ encoder.encodeMessage(message, encodedMessage);
+ encodedVars = encodedMessage.getEncodedVarsAsBoxedLongs();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'
AND message_encodedVars = %s"
+ + " AND message_encodedVars = %s"
+ + " AND REGEXP_LIKE(clpdecode(message_logtype,
message_dictionaryVars, message_encodedVars, ''),"
+ + " '%s')",
+ encodedMessage.getLogTypeAsString(), encodedVars[0],
encodedVars[1], String.format("^%s$", message))
+ );
+
+ // Query with no wildcards, a dictionary var, and an encoded var
+ message = " var123 456 ";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ encodedVars = encodedMessage.getEncodedVarsAsBoxedLongs();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'%s')", message),
+ String.format("SELECT * FROM clpTable WHERE message_logtype = '%s'
AND message_dictionaryVars = '%s'"
+ + " AND message_encodedVars = %s",
encodedMessage.getLogTypeAsString(), dictionaryVars[0],
+ encodedVars[0])
+ );
+
+ // Query with wildcards for a single dictionary var
+ message = "var123";
+ encoder.encodeMessage(message, encodedMessage);
+ dictionaryVars = encodedMessage.getDictionaryVarsAsStrings();
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'*%s*')", message),
+ String.format("SELECT * FROM clpTable WHERE
REGEXP_LIKE(message_logtype, '.*%s.*')"
+ + " AND REGEXP_LIKE(message_dictionaryVars, '.*%s.*')"
+ + " AND REGEXP_LIKE(clpdecode(message_logtype,
message_dictionaryVars, message_encodedVars, ''),"
+ + " '%s')",
+ encodedMessage.getLogTypeAsString(), dictionaryVars[0],
String.format(".*%s.*", message))
+ );
+
+ // Query with wildcards for a single var which could be a float encoded
var, int encoded var, or dictionary var
+ encoder.encodeMessage("123", encodedMessage);
+ String subquery1Logtype = encodedMessage.getLogTypeAsString();
+ encoder.encodeMessage("123.0", encodedMessage);
+ String subquery2Logtype = encodedMessage.getLogTypeAsString();
+ encoder.encodeMessage("var123", encodedMessage);
+ String subquery3Logtype = encodedMessage.getLogTypeAsString();
+ message = "123";
+ testQueryRewrite(
+ String.format("SELECT * FROM clpTable WHERE clpMatch(message,
'*%s*')", message),
+ String.format("SELECT * FROM clpTable WHERE ("
+ + "(REGEXP_LIKE(message_logtype, '.*%s.*')"
+ + " AND clpEncodedVarsMatch(message_logtype,
message_encodedVars, '*%s*', 0))"
+ + " OR (REGEXP_LIKE(message_logtype, '.*%s.*')"
+ + " AND clpEncodedVarsMatch(message_logtype,
message_encodedVars, '*%s*', 1))"
+ + " OR (REGEXP_LIKE(message_logtype, '.*%s.*') AND
REGEXP_LIKE(message_dictionaryVars, '.*%s.*'))"
+ + ") AND REGEXP_LIKE(clpdecode(message_logtype,
message_dictionaryVars, message_encodedVars, ''),"
+ + " '.*%s.*')",
+ subquery1Logtype, message, subquery2Logtype, message,
subquery3Logtype, message, message)
+ );
+ } catch (IOException e) {
+ fail("Failed to encode message", e);
+ }
+ }
+
+ /**
+ * Flattens an AND expression such that any of its children that are AND ops
are elided by adding their operands to
+ * the given expression.
+ * <p>
+ * Ex: "x = '1' AND ('y' = 2 AND NOT 'z' = 3)" would be flattened to
+ * "x '1' AND 'y' = 2 AND NOT 'z' = 3"
+ * @param expr
+ */
+ private void flattenAndExpression(Expression expr) {
+ List<Expression> newOperands = new ArrayList<>();
+ Function func = expr.getFunctionCall();
+ for (Expression childOp : func.getOperands()) {
+ if (!childOp.isSetFunctionCall()) {
+ newOperands.add(childOp);
+ continue;
+ }
+
+ Function childFunc = childOp.getFunctionCall();
+ if (childFunc.getOperator().equals(SqlKind.AND.name())) {
+ flattenAndExpression(childOp);
+ newOperands.addAll(childOp.getFunctionCall().getOperands());
+ } else {
+ flattenAllAndExpressions(childOp);
+ newOperands.add(childOp);
+ }
+ }
+ func.setOperands(newOperands);
+ }
+
+ /**
+ * Flattens all AND expressions in a given expression.
+ * <p>
+ * Ex: "a = 0 OR (x = '1' AND ('y' = 2 AND NOT 'z' = 3))" would be flattened
to
+ * "a = 0 OR (x '1' AND 'y' = 2 AND NOT 'z' = 3)"
+ * @param expr
+ */
+ private void flattenAllAndExpressions(Expression expr) {
+ if (!expr.isSetFunctionCall()) {
+ return;
+ }
+
+ List<Expression> newOperands = new ArrayList<>();
+
+ Function func = expr.getFunctionCall();
+ if (func.getOperator().equals(SqlKind.AND.name())) {
+ flattenAndExpression(expr);
+ return;
+ }
+
+ // Recursively handle the expression's operands
+ for (Expression childOp : func.getOperands()) {
+ if (!childOp.isSetFunctionCall()) {
+ newOperands.add(childOp);
+ continue;
+ }
+
+ Function childFunc = childOp.getFunctionCall();
+ if (childFunc.getOperator().equals(SqlKind.AND.name())) {
+ flattenAndExpression(childOp);
+ } else {
+ flattenAllAndExpressions(childOp);
+ }
+ newOperands.add(childOp);
+ }
+
+ func.setOperands(newOperands);
+ }
+
+ @Test
+ public void testUnsupportedCLPDecodeQueries() {
+ testUnsupportedQuery("SELECT clpDecode('message') FROM clpTable");
+ testUnsupportedQuery("SELECT clpDecode('message', 'default') FROM
clpTable");
+ testUnsupportedQuery("SELECT clpDecode('message', default) FROM clpTable");
+ testUnsupportedQuery("SELECT clpDecode(message, default) FROM clpTable");
+ }
+
+ private void testQueryRewrite(String original, String expected) {
+ PinotQuery originalQuery =
_QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(original));
+ PinotQuery expectedQuery = CalciteSqlParser.compileToPinotQuery(expected);
+ // Flatten any AND expressions in the rewritten query.
+ // NOTE: The rewritten query may have nested AND conditions of the form (A
AND (B AND C)). If we don't flatten them,
+ // comparison with the expected query will fail.
+ Expression origQueryFilterExpr = originalQuery.getFilterExpression();
+ if (null != origQueryFilterExpr) {
+ flattenAllAndExpressions(origQueryFilterExpr);
+ }
+ assertEquals(originalQuery, expectedQuery);
+ }
+
+ private void testUnsupportedQuery(String query) {
+ assertThrows(SqlCompilationException.class,
+ () ->
_QUERY_REWRITER.rewrite(CalciteSqlParser.compileToPinotQuery(query)));
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ClpEncodedVarsMatchTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ClpEncodedVarsMatchTransformFunction.java
new file mode 100644
index 0000000000..ba116e8eb5
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/ClpEncodedVarsMatchTransformFunction.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import com.google.common.base.Preconditions;
+import com.yscope.clp.compressorfrontend.AbstractClpEncodedSubquery;
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EightByteClpEncodedSubquery;
+import com.yscope.clp.compressorfrontend.EightByteClpWildcardQueryEncoder;
+import com.yscope.clp.compressorfrontend.MessageDecoder;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.core.operator.ColumnContext;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Performs a wildcard match on the encoded variables of a CLP-encoded column
group. This is used by the clpMatch
+ * function (implemented using {@link
org.apache.pinot.sql.parsers.rewriter.ClpRewriter}) and likely wouldn't be
called
+ * manually by a user.
+ * <p>
+ * Syntax:
+ * <pre>
+ * clpEncodedVarsMatch(columnGroupName_logtype, columnGroupName_encodedVars,
wildcardQuery, subQueryIndex)
+ * </pre>
+ */
+public class ClpEncodedVarsMatchTransformFunction extends
BaseTransformFunction {
+ private static final Logger _logger =
LoggerFactory.getLogger(ClpEncodedVarsMatchTransformFunction.class);
+
+ private final List<TransformFunction> _transformFunctions = new
ArrayList<>();
+ private byte[] _serializedVarTypes;
+ private byte[] _serializedVarWildcardQueries;
+ private int[] _varWildcardQueryEndIndexes;
+
+ @Override
+ public String getName() {
+ return TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName();
+ }
+
+ @Override
+ public void init(List<TransformFunction> arguments, Map<String,
ColumnContext> columnContextMap) {
+ Preconditions.checkArgument(arguments.size() == 4, "Syntax error:
clpEncodedVarsMatch takes 4 arguments - "
+ + "clpEncodedVarsMatch(columnGroupName_logtype,
columnGroupName_encodedVars, wildcardQuery, subQueryIndex");
+
+ Iterator<TransformFunction> argsIter = arguments.iterator();
+
+ TransformFunction f = argsIter.next();
+ Preconditions.checkArgument(f instanceof IdentifierTransformFunction, "1st
argument must be an identifier");
+ _transformFunctions.add(f);
+
+ f = argsIter.next();
+ Preconditions.checkArgument(f instanceof IdentifierTransformFunction, "2nd
argument must be an identifier");
+ _transformFunctions.add(f);
+
+ f = argsIter.next();
+ Preconditions.checkArgument(f instanceof LiteralTransformFunction, "3rd
argument must be a string literal");
+ String wildcardQuery = ((LiteralTransformFunction) f).getStringLiteral();
+
+ f = argsIter.next();
+ Preconditions.checkArgument(f instanceof LiteralTransformFunction, "4th
argument must be a long literal");
+ long subqueryIndex = ((LiteralTransformFunction) f).getLongLiteral();
+
+ EightByteClpWildcardQueryEncoder queryEncoder =
+ new
EightByteClpWildcardQueryEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ EightByteClpEncodedSubquery[] subqueries =
queryEncoder.encode(wildcardQuery);
+ if (subqueryIndex < 0 || subqueryIndex > subqueries.length) {
+ throw new IllegalArgumentException("Invalid subquery index.");
+ }
+ EightByteClpEncodedSubquery subquery = subqueries[(int) subqueryIndex];
+ int numEncodedVarWildcardQueries =
subquery.getNumEncodedVarWildcardQueries();
+ if (0 == numEncodedVarWildcardQueries) {
+ throw new IllegalArgumentException("Subquery doesn't contain any
wildcard queries for encoded variables.");
+ }
+
+ try {
+ ByteArrayOutputStream serializedVarTypes = new ByteArrayOutputStream();
+ ByteArrayOutputStream serializedWildcardQueries = new
ByteArrayOutputStream();
+ List<Integer> serializedWildcardQueryEndIndices = new ArrayList<>();
+ for (AbstractClpEncodedSubquery.VariableWildcardQuery q :
subquery.getEncodedVarWildcardQueries()) {
+ serializedVarTypes.write(q.getType());
+ serializedWildcardQueries.write(q.getQuery().toByteArray());
+
serializedWildcardQueryEndIndices.add(serializedWildcardQueries.size());
+ }
+ _serializedVarTypes = serializedVarTypes.toByteArray();
+ _serializedVarWildcardQueries = serializedWildcardQueries.toByteArray();
+ _varWildcardQueryEndIndexes =
ArrayUtils.toPrimitive(serializedWildcardQueryEndIndices.toArray(new
Integer[0]));
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Wildcard query could not be
serialized", e);
+ }
+ }
+
+ @Override
+ public TransformResultMetadata getResultMetadata() {
+ return new TransformResultMetadata(FieldSpec.DataType.BOOLEAN, true,
false);
+ }
+
+ @Override
+ public int[] transformToIntValuesSV(ValueBlock valueBlock) {
+ int length = valueBlock.getNumDocs();
+ if (null == _intValuesSV) {
+ _intValuesSV = new int[length];
+ }
+
+ int functionIdx = 0;
+ TransformFunction logtypeTransformFunction =
_transformFunctions.get(functionIdx++);
+ TransformFunction encodedVarsTransformFunction =
_transformFunctions.get(functionIdx++);
+ byte[][] logtypes =
logtypeTransformFunction.transformToBytesValuesSV(valueBlock);
+ long[][] encodedVars =
encodedVarsTransformFunction.transformToLongValuesMV(valueBlock);
+
+ MessageDecoder clpMessageDecoder = new
MessageDecoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ try {
+ clpMessageDecoder.batchEncodedVarsWildcardMatch(logtypes, encodedVars,
_serializedVarTypes,
+ _serializedVarWildcardQueries, _varWildcardQueryEndIndexes,
_intValuesSV);
+ } catch (IOException ex) {
+ _logger.error("Failed to perform wildcard match on (CLP) encoded
variables field.", ex);
+ Arrays.fill(_intValuesSV, 0);
+ }
+
+ return _intValuesSV;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index 82afb6dbeb..d5e4d9d481 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -146,6 +146,7 @@ public class TransformFunctionFactory {
typeToImplementation.put(TransformFunctionType.IN_ID_SET,
InIdSetTransformFunction.class);
typeToImplementation.put(TransformFunctionType.LOOKUP,
LookupTransformFunction.class);
typeToImplementation.put(TransformFunctionType.CLP_DECODE,
CLPDecodeTransformFunction.class);
+ typeToImplementation.put(TransformFunctionType.CLP_ENCODED_VARS_MATCH,
ClpEncodedVarsMatchTransformFunction.class);
typeToImplementation.put(TransformFunctionType.EXTRACT,
ExtractTransformFunction.class);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CLPDecodeTransformFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ClpTransformFunctionsTest.java
similarity index 69%
rename from
pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CLPDecodeTransformFunctionTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ClpTransformFunctionsTest.java
index e0d419dca0..67c1a6ef08 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CLPDecodeTransformFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ClpTransformFunctionsTest.java
@@ -63,7 +63,7 @@ import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
-public class CLPDecodeTransformFunctionTest {
+public class ClpTransformFunctionsTest {
private static final String SEGMENT_NAME = "testSegmentForClpDecode";
private static final String INDEX_DIR_PATH =
FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
private static final String TIMESTAMP_COLUMN = "timestampColumn";
@@ -119,9 +119,10 @@ public class CLPDecodeTransformFunctionTest {
_logtypeValues[NUM_ROWS - 1] = clpEncodedMessage.getLogTypeAsString();
_dictVarValues[NUM_ROWS - 1] =
clpEncodedMessage.getDictionaryVarsAsStrings();
_encodedVarValues[NUM_ROWS - 1] =
clpEncodedMessage.getEncodedVarsAsBoxedLongs();
- // Corrupt the previous two rows, so we can test the default value
+ // Corrupt a row, so we can test the default value
+ // NOTE: We don't corrupt the encoded variables column since that would
cause clpEncodedVarsMatch to detect an
+ // error and abandon the batch, rendering the test useless.
_dictVarValues[NUM_ROWS - 2] = null;
- _encodedVarValues[NUM_ROWS - 3] = null;
List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
for (int i = 0; i < NUM_ROWS; i++) {
@@ -159,7 +160,7 @@ public class CLPDecodeTransformFunctionTest {
}
@Test
- public void testTransform() {
+ public void testClpDecode() {
ExpressionContext expression = RequestContextUtils.getExpression(
String.format("%s(%s,%s,%s)",
TransformFunctionType.CLP_DECODE.getName(), LOGTYPE_COLUMN, DICT_VARS_COLUMN,
ENCODED_VARS_COLUMN));
@@ -168,14 +169,13 @@ public class CLPDecodeTransformFunctionTest {
String[] expectedValues = new String[NUM_ROWS];
Arrays.fill(expectedValues, TEST_MESSAGE);
- expectedValues[NUM_ROWS - 3] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
expectedValues[NUM_ROWS - 2] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
expectedValues[NUM_ROWS - 1] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
- testTransformFunction(transformFunction, expectedValues);
+ testStringTransformFunc(transformFunction, expectedValues);
}
@Test
- public void testTransformWithDefaultValue() {
+ public void testClpDecodeWithDefaultValue() {
String defaultValue = "default";
ExpressionContext expression = RequestContextUtils.getExpression(
String.format("%s(%s,%s,%s,'%s')",
TransformFunctionType.CLP_DECODE.getName(), LOGTYPE_COLUMN, DICT_VARS_COLUMN,
@@ -185,14 +185,13 @@ public class CLPDecodeTransformFunctionTest {
String[] expectedValues = new String[NUM_ROWS];
Arrays.fill(expectedValues, TEST_MESSAGE);
- expectedValues[NUM_ROWS - 3] = defaultValue;
expectedValues[NUM_ROWS - 2] = defaultValue;
expectedValues[NUM_ROWS - 1] = DEFAULT_DIMENSION_NULL_VALUE_OF_STRING;
- testTransformFunction(transformFunction, expectedValues);
+ testStringTransformFunc(transformFunction, expectedValues);
}
@Test
- public void testInvalidArgs() {
+ public void testClpDecodeWithInvalidArg() {
String defaultValue = "default";
// 1st parameter literal
@@ -235,10 +234,100 @@ public class CLPDecodeTransformFunctionTest {
});
}
- private void testTransformFunction(TransformFunction transformFunction,
String[] expectedValues) {
+ @Test
+ public void testClpEncodedVarsMatch() {
+ String wildcardQuery;
+
+ // Test query which will match
+ wildcardQuery = "*51*";
+ // The query should generate three subqueries: One for an encoded integer
var, one for an encoded float var, and one
+ // for a dictionary var, in that order.
+ testClpEncodedVarsMatch(wildcardQuery, 0, false);
+ testClpEncodedVarsMatch(wildcardQuery, 1, true);
+ }
+
+ @Test
+ public void testClpEncodedVarsMatchWithInvalidArg() {
+ String wildcardQuery = "*123*";
+ long subqueryIdx = 0;
+
+ // 1st parameter literal
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s('%s',%s,'%s',%s)",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery, subqueryIdx));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // 2nd parameter literal
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,'%s','%s',%s)",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery, subqueryIdx));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // 3rd parameter identifier
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s,%s,%s)",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, ENCODED_VARS_COLUMN, subqueryIdx));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // 4th parameter identifier
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s('%s',%s,'%s',%s)",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(),
+ LOGTYPE_COLUMN, ENCODED_VARS_COLUMN, wildcardQuery,
ENCODED_VARS_COLUMN));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+
+ // Missing args
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s)",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s)",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+ assertThrows(BadQueryRequestException.class, () -> {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s,'%s')",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery));
+ TransformFunctionFactory.get(expression, _dataSourceMap);
+ });
+ }
+
+ private void testClpEncodedVarsMatch(String wildcardQuery, int subqueryIdx,
boolean shouldMatch) {
+ ExpressionContext expression = RequestContextUtils.getExpression(
+ String.format("%s(%s,%s,'%s',%s)",
TransformFunctionType.CLP_ENCODED_VARS_MATCH.getName(), LOGTYPE_COLUMN,
+ ENCODED_VARS_COLUMN, wildcardQuery, subqueryIdx));
+ TransformFunction transformFunction =
TransformFunctionFactory.get(expression, _dataSourceMap);
+ Assert.assertTrue(transformFunction instanceof
ClpEncodedVarsMatchTransformFunction);
+
+ int[] expectedValues = new int[NUM_ROWS];
+ Arrays.fill(expectedValues, shouldMatch ? 1 : 0);
+ // The last row won't match since it's a null
+ expectedValues[NUM_ROWS - 1] = 0;
+ testIntTransformFunc(transformFunction, expectedValues);
+ }
+
+ private void testStringTransformFunc(TransformFunction transformFunction,
String[] expectedValues) {
String[] values =
transformFunction.transformToStringValuesSV(_projectionBlock);
for (int i = 0; i < NUM_ROWS; i++) {
assertEquals(values[i], expectedValues[i]);
}
}
+
+ private void testIntTransformFunc(TransformFunction transformFunction, int[]
expectedValues) {
+ int[] values = transformFunction.transformToIntValuesSV(_projectionBlock);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ assertEquals(values[i], expectedValues[i]);
+ }
+ }
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
index 355b7e7ee1..16ca7749ba 100644
---
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.BaseRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;
-import org.apache.pinot.sql.parsers.rewriter.CLPDecodeRewriter;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,8 +159,8 @@ public class CLPLogRecordExtractor extends
BaseRecordExtractor<Map<String, Objec
}
}
- to.putValue(key + CLPDecodeRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
- to.putValue(key + CLPDecodeRewriter.DICTIONARY_VARS_COLUMN_SUFFIX,
dictVars);
- to.putValue(key + CLPDecodeRewriter.ENCODED_VARS_COLUMN_SUFFIX,
encodedVars);
+ to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
+ to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
+ to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);
}
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
index 5008b5af43..72058be345 100644
---
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
@@ -27,7 +27,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.sql.parsers.rewriter.CLPDecodeRewriter;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
import org.testng.annotations.Test;
import static
org.apache.pinot.plugin.inputformat.clplog.CLPLogRecordExtractorConfig.FIELDS_FOR_CLP_ENCODING_CONFIG_KEY;
@@ -136,9 +136,9 @@ public class CLPLogRecordExtractorTest {
}
private void addCLPEncodedField(String fieldName, Set<String> fields) {
- fields.add(fieldName + CLPDecodeRewriter.LOGTYPE_COLUMN_SUFFIX);
- fields.add(fieldName + CLPDecodeRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
- fields.add(fieldName + CLPDecodeRewriter.ENCODED_VARS_COLUMN_SUFFIX);
+ fields.add(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
+ fields.add(fieldName + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
+ fields.add(fieldName + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX);
}
private GenericRow extract(Map<String, String> props, Set<String>
fieldsToRead) {
@@ -163,12 +163,12 @@ public class CLPLogRecordExtractorTest {
try {
// Decode and validate field
assertNull(row.getValue(fieldName));
- String logtype = (String) row.getValue(fieldName +
CLPDecodeRewriter.LOGTYPE_COLUMN_SUFFIX);
+ String logtype = (String) row.getValue(fieldName +
ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
assertNotEquals(logtype, null);
String[] dictionaryVars =
- (String[]) row.getValue(fieldName +
CLPDecodeRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
+ (String[]) row.getValue(fieldName +
ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
assertNotEquals(dictionaryVars, null);
- Long[] encodedVars = (Long[]) row.getValue(fieldName +
CLPDecodeRewriter.ENCODED_VARS_COLUMN_SUFFIX);
+ Long[] encodedVars = (Long[]) row.getValue(fieldName +
ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX);
assertNotEquals(encodedVars, null);
long[] encodedVarsAsPrimitives =
Arrays.stream(encodedVars).mapToLong(Long::longValue).toArray();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]