This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6abfbfb6f7 [multistage] fix function usage of casting (#9909)
6abfbfb6f7 is described below
commit 6abfbfb6f738fc8b19778595d63d31b2f96a5ae1
Author: Rong Rong <[email protected]>
AuthorDate: Fri Dec 9 14:30:33 2022 -0800
[multistage] fix function usage of casting (#9909)
This PR addresses two issues with type-casting.
1. `dataSchema.ColumnDataType.convert()` will assume the underlying
serialized data type. which should not be used other than the Numeric columns.
2. `functionInvoker` only casts input/operand into the correct parameter
java type but it doesn't do it for output.
This PR:
- fixed the 1st issue, by
- create a more strict failure mechanism on boolean checker
- always do Numeric casting on input/operand type
- fixed the 2nd issue, by
- allow return type casting on TransformOperator
- making sure the casting handles null
- general improvements:
- merged filter operand to transform operand, so that filter clause can
also be used during transform such as: `SELECT boolCol AND (intCol > 0) FROM
tbl`
- unified RelDataType resolution to DataSchema.ColumnDataType,
PinotDataType and FieldSpec.DataType
TODOs
- the casting doesn't need to be checked on a per row-basis thanks to
operator has the return schema: this means the casting can be hard-coded when
required based on inferred data type
- add more tests
Co-authored-by: Rong Rong <[email protected]>
---
.../org/apache/pinot/query/catalog/PinotTable.java | 2 +-
.../query/planner/logical/RelToStageConverter.java | 28 ++-
.../pinot/query/planner/logical/RexExpression.java | 56 +----
.../query/planner/logical/RexExpressionUtils.java | 12 +-
.../apache/pinot/query/routing/WorkerManager.java | 2 +-
.../org/apache/pinot/query/type/TypeSystem.java | 12 ++
.../query/mailbox/MultiplexingMailboxService.java | 2 +-
.../query/runtime/operator/FilterOperator.java | 11 +-
.../query/runtime/operator/HashJoinOperator.java | 13 +-
.../LeafStageTransferableBlockOperator.java | 2 +-
.../query/runtime/operator/TransformOperator.java | 4 +-
.../runtime/operator/operands/FilterOperand.java | 226 ++++-----------------
.../runtime/operator/operands/FunctionOperand.java | 2 +-
.../operator/operands/TransformOperand.java | 103 +++++++++-
.../operator/utils/FunctionInvokeUtils.java | 45 ++++
.../operator/{ => utils}/OperatorUtils.java | 2 +-
.../query/runtime/operator/FilterOperatorTest.java | 13 +-
.../runtime/operator/TransformOperatorTest.java | 4 +-
.../src/test/resources/queries/TypeCasting.json | 163 +++++++++++++++
19 files changed, 432 insertions(+), 270 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
index 23e6444e90..36a5f3730c 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotTable.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.query.catalog;
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.type.RelDataType;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
index fcd19ad4c5..72976185aa 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToStageConverter.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
@@ -43,6 +44,7 @@ import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.spi.data.FieldSpec;
/**
@@ -131,7 +133,7 @@ public final class RelToStageConverter {
String[] columnNames = recordType.getFieldNames().toArray(new
String[]{});
DataSchema.ColumnDataType[] columnDataTypes = new
DataSchema.ColumnDataType[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
- columnDataTypes[i] =
convertColumnDataType(recordType.getFieldList().get(i));
+ columnDataTypes[i] =
convertToColumnDataType(recordType.getFieldList().get(i).getType());
}
return new DataSchema(columnNames, columnDataTypes);
} else {
@@ -139,8 +141,8 @@ public final class RelToStageConverter {
}
}
- private static DataSchema.ColumnDataType
convertColumnDataType(RelDataTypeField relDataTypeField) {
- switch (relDataTypeField.getType().getSqlTypeName()) {
+ public static DataSchema.ColumnDataType convertToColumnDataType(RelDataType
relDataType) {
+ switch (relDataType.getSqlTypeName()) {
case BOOLEAN:
return DataSchema.ColumnDataType.BOOLEAN;
case TINYINT:
@@ -150,7 +152,7 @@ public final class RelToStageConverter {
case BIGINT:
return DataSchema.ColumnDataType.LONG;
case DECIMAL:
- return resolveDecimal(relDataTypeField);
+ return resolveDecimal(relDataType);
case FLOAT:
return DataSchema.ColumnDataType.FLOAT;
case REAL:
@@ -167,21 +169,29 @@ public final class RelToStageConverter {
case VARBINARY:
return DataSchema.ColumnDataType.BYTES;
default:
- throw new IllegalStateException("Unexpected RelDataTypeField: " +
relDataTypeField.getType() + " for column: "
- + relDataTypeField.getName());
+ return DataSchema.ColumnDataType.BYTES;
}
}
+ public static FieldSpec.DataType convertToFieldSpecDataType(RelDataType
relDataType) {
+ return convertToColumnDataType(relDataType).toDataType();
+ }
+
+ public static PinotDataType convertToPinotDataType(RelDataType relDataType) {
+ return
PinotDataType.getPinotDataTypeForExecution(convertToColumnDataType(relDataType));
+ }
+
/**
* Calcite uses DEMICAL type to infer data type hoisting and infer
arithmetic result types. down casting this
* back to the proper primitive type for Pinot.
*
* @param relDataType the DECIMAL rel data type.
* @return proper {@link DataSchema.ColumnDataType}.
+ * @see {@link org.apache.calcite.rel.type.RelDataTypeFactoryImpl#decimalOf}.
*/
- private static DataSchema.ColumnDataType resolveDecimal(RelDataTypeField
relDataType) {
- int precision = relDataType.getType().getPrecision();
- int scale = relDataType.getType().getScale();
+ private static DataSchema.ColumnDataType resolveDecimal(RelDataType
relDataType) {
+ int precision = relDataType.getPrecision();
+ int scale = relDataType.getScale();
if (scale == 0) {
if (precision <= 10) {
return DataSchema.ColumnDataType.INT;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
index e9a4a99679..7ef42ff7c9 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpression.java
@@ -22,14 +22,12 @@ import java.math.BigDecimal;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.NlsString;
-import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.query.planner.serde.ProtoProperties;
import org.apache.pinot.spi.data.FieldSpec;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -49,7 +47,7 @@ public interface RexExpression {
return new RexExpression.InputRef(((RexInputRef) rexNode).getIndex());
} else if (rexNode instanceof RexLiteral) {
RexLiteral rexLiteral = ((RexLiteral) rexNode);
- FieldSpec.DataType dataType = toDataType(rexLiteral.getType());
+ FieldSpec.DataType dataType =
RelToStageConverter.convertToFieldSpecDataType(rexLiteral.getType());
return new RexExpression.Literal(dataType, toRexValue(dataType,
rexLiteral.getValue()));
} else if (rexNode instanceof RexCall) {
RexCall rexCall = (RexCall) rexNode;
@@ -70,37 +68,17 @@ public interface RexExpression {
default:
List<RexExpression> operands =
rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
- return new RexExpression.FunctionCall(rexCall.getKind(),
toDataType(rexCall.getType()),
+ return new RexExpression.FunctionCall(rexCall.getKind(),
+ RelToStageConverter.convertToFieldSpecDataType(rexCall.getType()),
rexCall.getOperator().getName(), operands);
}
}
- static PinotDataType toPinotDataType(RelDataType type) {
- switch (type.getSqlTypeName()) {
- case INTEGER:
- return PinotDataType.INTEGER;
- case BIGINT:
- return PinotDataType.LONG;
- case FLOAT:
- return PinotDataType.FLOAT;
- // TODO: support DECIMAL properly.
- case DECIMAL:
- case DOUBLE:
- return PinotDataType.DOUBLE;
- case CHAR:
- case VARCHAR:
- return PinotDataType.STRING;
- case BOOLEAN:
- return PinotDataType.BOOLEAN;
- default:
- throw new IllegalArgumentException("Unsupported data type: " + type);
- }
- }
-
static RexExpression toRexExpression(AggregateCall aggCall) {
List<RexExpression> operands =
aggCall.getArgList().stream().map(InputRef::new).collect(Collectors.toList());
- return new RexExpression.FunctionCall(aggCall.getAggregation().getKind(),
toDataType(aggCall.getType()),
- aggCall.getAggregation().getName(), operands);
+ return new RexExpression.FunctionCall(aggCall.getAggregation().getKind(),
+ RelToStageConverter.convertToFieldSpecDataType(aggCall.getType()),
aggCall.getAggregation().getName(),
+ operands);
}
static Object toRexValue(FieldSpec.DataType dataType, Comparable value) {
@@ -121,28 +99,6 @@ public interface RexExpression {
}
}
- static FieldSpec.DataType toDataType(RelDataType type) {
- switch (type.getSqlTypeName()) {
- case INTEGER:
- return FieldSpec.DataType.INT;
- case BIGINT:
- return FieldSpec.DataType.LONG;
- case FLOAT:
- return FieldSpec.DataType.FLOAT;
- case DECIMAL:
- case DOUBLE:
- return FieldSpec.DataType.DOUBLE;
- case CHAR:
- case VARCHAR:
- return FieldSpec.DataType.STRING;
- case BOOLEAN:
- return FieldSpec.DataType.BOOLEAN;
- default:
- // TODO: do not assume byte type.
- return FieldSpec.DataType.BYTES;
- }
- }
-
class InputRef implements RexExpression {
@ProtoProperties
private SqlKind _sqlKind;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index 314b0baa54..364da7c164 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -43,7 +43,8 @@ public class RexExpressionUtils {
static RexExpression handleCase(RexCall rexCall) {
List<RexExpression> operands =
rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
- return new RexExpression.FunctionCall(rexCall.getKind(),
RexExpression.toDataType(rexCall.getType()),
+ return new RexExpression.FunctionCall(rexCall.getKind(),
+ RelToStageConverter.convertToFieldSpecDataType(rexCall.getType()),
"caseWhen", operands);
}
@@ -54,9 +55,10 @@ public class RexExpressionUtils {
rexCall.getOperands().stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
Preconditions.checkState(operands.size() == 1, "CAST takes exactly 2
arguments");
RelDataType castType = rexCall.getType();
- operands.add(new RexExpression.Literal(FieldSpec.DataType.STRING,
RexExpression.toPinotDataType(castType).name()));
- return new RexExpression.FunctionCall(rexCall.getKind(),
RexExpression.toDataType(castType), "CAST",
- operands);
+ operands.add(new RexExpression.Literal(FieldSpec.DataType.STRING,
+ RelToStageConverter.convertToFieldSpecDataType(castType).name()));
+ return new RexExpression.FunctionCall(rexCall.getKind(),
RelToStageConverter.convertToFieldSpecDataType(castType),
+ "CAST", operands);
}
// TODO: Add support for range filter expressions (e.g. a > 0 and a < 30)
@@ -64,7 +66,7 @@ public class RexExpressionUtils {
List<RexNode> operands = rexCall.getOperands();
RexInputRef rexInputRef = (RexInputRef) operands.get(0);
RexLiteral rexLiteral = (RexLiteral) operands.get(1);
- FieldSpec.DataType dataType =
RexExpression.toDataType(rexLiteral.getType());
+ FieldSpec.DataType dataType =
RelToStageConverter.convertToFieldSpecDataType(rexLiteral.getType());
Sarg sarg = rexLiteral.getValueAs(Sarg.class);
if (sarg.isPoints()) {
return new RexExpression.FunctionCall(SqlKind.IN, dataType,
SqlKind.IN.name(), toFunctionOperands(rexInputRef,
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 42bb19d269..36c0f2bd81 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.query.routing;
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
index ea6a9e7a35..5476c8ad23 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/type/TypeSystem.java
@@ -25,6 +25,8 @@ import org.apache.calcite.rel.type.RelDataTypeSystemImpl;
* The {@code TypeSystem} overwrites Calcite type system with Pinot specific
logics.
*/
public class TypeSystem extends RelDataTypeSystemImpl {
+ private static final int MAX_DECIMAL_SCALE_DIGIT = 1000;
+ private static final int MAX_DECIMAL_PRECISION_DIGIT = 1000;
@Override
public boolean shouldConvertRaggedUnionTypesToVarying() {
@@ -38,4 +40,14 @@ public class TypeSystem extends RelDataTypeSystemImpl {
// behavior. This calcite flag will cause this to be cast to VARCHAR
instead
return true;
}
+
+ @Override
+ public int getMaxNumericScale() {
+ return MAX_DECIMAL_SCALE_DIGIT;
+ }
+
+ @Override
+ public int getMaxNumericPrecision() {
+ return MAX_DECIMAL_PRECISION_DIGIT;
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
index f8bf271233..e80a65ce71 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MultiplexingMailboxService.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.query.mailbox;
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
import java.util.function.Consumer;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.spi.env.PinotConfiguration;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index 66956a95ec..32fe4508cf 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -28,7 +28,9 @@ import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
+import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+
/*
FilterOperator apply filter on rows from upstreamOperator.
@@ -46,14 +48,14 @@ import
org.apache.pinot.query.runtime.operator.operands.FilterOperand;
public class FilterOperator extends BaseOperator<TransferableBlock> {
private static final String EXPLAIN_NAME = "FILTER";
private final Operator<TransferableBlock> _upstreamOperator;
- private final FilterOperand _filterOperand;
+ private final TransformOperand _filterOperand;
private final DataSchema _dataSchema;
private TransferableBlock _upstreamErrorBlock;
public FilterOperator(Operator<TransferableBlock> upstreamOperator,
DataSchema dataSchema, RexExpression filter) {
_upstreamOperator = upstreamOperator;
_dataSchema = dataSchema;
- _filterOperand = FilterOperand.toFilterOperand(filter, dataSchema);
+ _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema);
_upstreamErrorBlock = null;
}
@@ -78,6 +80,7 @@ public class FilterOperator extends
BaseOperator<TransferableBlock> {
}
}
+ @SuppressWarnings("ConstantConditions")
private TransferableBlock transform(TransferableBlock block)
throws Exception {
if (_upstreamErrorBlock != null) {
@@ -92,7 +95,7 @@ public class FilterOperator extends
BaseOperator<TransferableBlock> {
List<Object[]> resultRows = new ArrayList<>();
List<Object[]> container = block.getContainer();
for (Object[] row : container) {
- if (_filterOperand.apply(row)) {
+ if ((Boolean) FunctionInvokeUtils.convert(_filterOperand.apply(row),
DataSchema.ColumnDataType.BOOLEAN)) {
resultRows.add(row);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index a0bc9329c7..89ff2f1abb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator;
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
@@ -37,7 +37,8 @@ import
org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.operands.FilterOperand;
+import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
/**
@@ -61,7 +62,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
private final JoinRelType _joinType;
private final DataSchema _resultSchema;
private final int _resultRowSize;
- private final List<FilterOperand> _joinClauseEvaluators;
+ private final List<TransformOperand> _joinClauseEvaluators;
private boolean _isHashTableBuilt;
private TransferableBlock _upstreamErrorBlock;
private KeySelector<Object[], Object[]> _leftKeySelector;
@@ -80,7 +81,7 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
_resultSchema = outputSchema;
_joinClauseEvaluators = new ArrayList<>(joinClauses.size());
for (RexExpression joinClause : joinClauses) {
- _joinClauseEvaluators.add(FilterOperand.toFilterOperand(joinClause,
_resultSchema));
+
_joinClauseEvaluators.add(TransformOperand.toTransformOperand(joinClause,
_resultSchema));
}
_joinType = joinType;
_resultRowSize = _resultSchema.size();
@@ -168,8 +169,8 @@ public class HashJoinOperator extends
BaseOperator<TransferableBlock> {
for (Object[] rightRow : hashCollection) {
// TODO: Optimize this to avoid unnecessary object copy.
Object[] resultRow = joinRow(leftRow, rightRow);
- if (_joinClauseEvaluators.isEmpty() || _joinClauseEvaluators.stream()
- .allMatch(evaluator -> evaluator.apply(resultRow))) {
+ if (_joinClauseEvaluators.isEmpty() ||
_joinClauseEvaluators.stream().allMatch(evaluator ->
+ (Boolean)
FunctionInvokeUtils.convert(evaluator.apply(resultRow),
DataSchema.ColumnDataType.BOOLEAN))) {
rows.add(resultRow);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
index 41f8b4535e..582e85a9ec 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator;
-import com.clearspring.analytics.util.Preconditions;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index e00d4a9eed..80af357cec 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -30,6 +30,7 @@ import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.operator.operands.TransformOperand;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
/**
@@ -104,7 +105,8 @@ public class TransformOperator extends
BaseOperator<TransferableBlock> {
for (Object[] row : container) {
Object[] resultRow = new Object[_resultColumnSize];
for (int i = 0; i < _resultColumnSize; i++) {
- resultRow[i] = _transformOperandsList.get(i).apply(row);
+ resultRow[i] =
FunctionInvokeUtils.convert(_transformOperandsList.get(i).apply(row),
+ _resultSchema.getColumnDataType(i));
}
resultRows.add(resultRow);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
index 169643b24b..37a932c594 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
@@ -18,167 +18,33 @@
*/
package org.apache.pinot.query.runtime.operator.operands;
-import com.clearspring.analytics.util.Preconditions;
+
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.runtime.operator.OperatorUtils;
-import org.apache.pinot.spi.data.FieldSpec;
public abstract class FilterOperand extends TransformOperand {
- public static FilterOperand toFilterOperand(RexExpression rexExpression,
DataSchema dataSchema) {
- if (rexExpression instanceof RexExpression.FunctionCall) {
- return toFilterOperand((RexExpression.FunctionCall) rexExpression,
dataSchema);
- } else if (rexExpression instanceof RexExpression.InputRef) {
- return toFilterOperand((RexExpression.InputRef) rexExpression,
dataSchema);
- } else if (rexExpression instanceof RexExpression.Literal) {
- return toFilterOperand((RexExpression.Literal) rexExpression);
- } else {
- throw new UnsupportedOperationException("Unsupported expression on
filter conversion: " + rexExpression);
- }
- }
-
- private static FilterOperand toFilterOperand(RexExpression.Literal literal) {
- return new BooleanLiteral(literal);
- }
-
- private static FilterOperand toFilterOperand(RexExpression.InputRef
inputRef, DataSchema dataSchema) {
- return new BooleanInputRef(inputRef, dataSchema);
- }
-
- private static FilterOperand toFilterOperand(RexExpression.FunctionCall
functionCall, DataSchema dataSchema) {
- int operandSize = functionCall.getFunctionOperands().size();
- // TODO: Move these functions out of this class.
- switch
(OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
- case "AND":
- Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument,
passed in argument size:" + operandSize);
- return new And(functionCall.getFunctionOperands(), dataSchema);
- case "OR":
- Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument,
passed in argument size:" + operandSize);
- return new Or(functionCall.getFunctionOperands(), dataSchema);
- case "NOT":
- Preconditions.checkState(operandSize == 1, "NOT takes one argument,
passed in argument size:" + operandSize);
- return new
Not(toFilterOperand(functionCall.getFunctionOperands().get(0), dataSchema));
- case "equals":
- return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
- @Override
- public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
- == 0;
- }
- };
- case "notEquals":
- return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
- @Override
- public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
- != 0;
- }
- };
- case "greaterThan":
- return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
- @Override
- public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
- > 0;
- }
- };
- case "greaterThanOrEqual":
- return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
- @Override
- public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
- >= 0;
- }
- };
- case "lessThan":
- return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
- @Override
- public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
- < 0;
- }
- };
- case "lessThanOrEqual":
- return new Predicate(functionCall.getFunctionOperands(), dataSchema) {
- @Override
- public Boolean apply(Object[] row) {
- return ((Comparable)
_resultType.convert(_lhs.apply(row))).compareTo(_resultType.convert(_rhs.apply(row)))
- <= 0;
- }
- };
- default:
- return new BooleanFunction(functionCall, dataSchema);
- }
- }
-
@Override
public abstract Boolean apply(Object[] row);
- private static class BooleanFunction extends FilterOperand {
- private final FunctionOperand _func;
+ public static class And extends FilterOperand {
+ List<TransformOperand> _childOperands;
- public BooleanFunction(RexExpression.FunctionCall functionCall, DataSchema
dataSchema) {
- FunctionOperand func = (FunctionOperand)
TransformOperand.toTransformOperand(functionCall, dataSchema);
- Preconditions.checkState(func.getResultType() ==
DataSchema.ColumnDataType.BOOLEAN,
- "Expecting boolean result type but got type:" +
func.getResultType());
- _func = func;
- }
-
- @Override
- public Boolean apply(Object[] row) {
- return (Boolean) _func.apply(row);
- }
- }
-
- private static class BooleanInputRef extends FilterOperand {
- private final RexExpression.InputRef _inputRef;
-
- public BooleanInputRef(RexExpression.InputRef inputRef, DataSchema
dataSchema) {
- DataSchema.ColumnDataType inputType =
dataSchema.getColumnDataType(inputRef.getIndex());
- Preconditions.checkState(inputType == DataSchema.ColumnDataType.BOOLEAN,
- "Input has to be boolean type but got type:" + inputType);
- _inputRef = inputRef;
- }
-
- @Override
- public Boolean apply(Object[] row) {
- return (boolean) row[_inputRef.getIndex()];
- }
- }
-
- private static class BooleanLiteral extends FilterOperand {
- private final Object _literalValue;
-
- public BooleanLiteral(RexExpression.Literal literal) {
- Preconditions.checkState(literal.getDataType() ==
FieldSpec.DataType.BOOLEAN,
- "Only boolean literal is supported as filter, but got type:" +
literal.getDataType());
- _literalValue = literal.getValue();
- }
-
- @Override
- public Boolean apply(Object[] row) {
- return (boolean) _literalValue;
- }
- }
-
- private static class And extends FilterOperand {
- List<FilterOperand> _childOperands;
-
- public And(List<RexExpression> childExprs, DataSchema dataSchema) {
+ public And(List<RexExpression> childExprs, DataSchema inputDataSchema) {
_childOperands = new ArrayList<>(childExprs.size());
for (RexExpression childExpr : childExprs) {
- _childOperands.add(toFilterOperand(childExpr, dataSchema));
+ _childOperands.add(toTransformOperand(childExpr, inputDataSchema));
}
}
@Override
public Boolean apply(Object[] row) {
- for (FilterOperand child : _childOperands) {
- if (!child.apply(row)) {
+ for (TransformOperand child : _childOperands) {
+ if (!(Boolean) child.apply(row)) {
return false;
}
}
@@ -186,20 +52,20 @@ public abstract class FilterOperand extends
TransformOperand {
}
}
- private static class Or extends FilterOperand {
- List<FilterOperand> _childOperands;
+ public static class Or extends FilterOperand {
+ List<TransformOperand> _childOperands;
- public Or(List<RexExpression> childExprs, DataSchema dataSchema) {
+ public Or(List<RexExpression> childExprs, DataSchema inputDataSchema) {
_childOperands = new ArrayList<>(childExprs.size());
for (RexExpression childExpr : childExprs) {
- _childOperands.add(toFilterOperand(childExpr, dataSchema));
+ _childOperands.add(toTransformOperand(childExpr, inputDataSchema));
}
}
@Override
public Boolean apply(Object[] row) {
- for (FilterOperand child : _childOperands) {
- if (child.apply(row)) {
+ for (TransformOperand child : _childOperands) {
+ if ((Boolean) child.apply(row)) {
return true;
}
}
@@ -207,34 +73,28 @@ public abstract class FilterOperand extends
TransformOperand {
}
}
- private static class Not extends FilterOperand {
- FilterOperand _childOperand;
+ public static class Not extends FilterOperand {
+ TransformOperand _childOperand;
- public Not(FilterOperand childOperand) {
- _childOperand = childOperand;
+ public Not(RexExpression childExpr, DataSchema inputDataSchema) {
+ _childOperand = toTransformOperand(childExpr, inputDataSchema);
}
@Override
public Boolean apply(Object[] row) {
- return !_childOperand.apply(row);
+ return !(Boolean) _childOperand.apply(row);
}
}
- private static abstract class Predicate extends FilterOperand {
+ public static abstract class Predicate extends FilterOperand {
protected final TransformOperand _lhs;
protected final TransformOperand _rhs;
- protected final DataSchema.ColumnDataType _resultType;
-
- public Predicate(List<RexExpression> functionOperands, DataSchema
dataSchema) {
- Preconditions.checkState(functionOperands.size() == 2,
- "Expected 2 function ops for Predicate but got:" +
functionOperands.size());
- _lhs = TransformOperand.toTransformOperand(functionOperands.get(0),
dataSchema);
- _rhs = TransformOperand.toTransformOperand(functionOperands.get(1),
dataSchema);
- _resultType = resolveResultType(_lhs._resultType, _rhs._resultType);
- }
+ protected final boolean _requireCasting;
+ protected final DataSchema.ColumnDataType _commonCastType;
/**
- * Resolve data type, since we don't have a exhausted list of filter
function signatures. we rely on type casting.
+ * Predicate constructor also resolve data type,
+ * since we don't have a exhausted list of filter function signatures. we
rely on type casting.
*
* <ul>
* <li>if both RHS and LHS has null data type, exception occurs.</li>
@@ -243,28 +103,32 @@ public abstract class FilterOperand extends
TransformOperand {
* <li>if we can't resolve a common data type, exception occurs.</li>
* </ul>
*
- * @param lhsType left-hand-side type
- * @param rhsType right-hand-side type
- * @return best common conversion data type.
- * @see DataSchema.ColumnDataType#isSuperTypeOf(DataSchema.ColumnDataType)
+ *
*/
- private static DataSchema.ColumnDataType
resolveResultType(DataSchema.ColumnDataType lhsType,
- DataSchema.ColumnDataType rhsType) {
+ public Predicate(List<RexExpression> functionOperands, DataSchema
inputDataSchema) {
+ Preconditions.checkState(functionOperands.size() == 2,
+ "Expected 2 function ops for Predicate but got:" +
functionOperands.size());
+ _lhs = TransformOperand.toTransformOperand(functionOperands.get(0),
inputDataSchema);
+ _rhs = TransformOperand.toTransformOperand(functionOperands.get(1),
inputDataSchema);
+
// TODO: Correctly throw exception instead of returning null.
// Currently exception thrown during constructor is not piped back to
query dispatcher, thus in order to
// avoid silent failure, we deliberately set to null here, make the
exception thrown during data processing.
- if (lhsType == null && rhsType == null) {
- return null;
- } else if (lhsType == null || lhsType ==
DataSchema.ColumnDataType.OBJECT) {
- return rhsType;
- } else if (rhsType == null || rhsType ==
DataSchema.ColumnDataType.OBJECT) {
- return lhsType;
- } else if (lhsType.isSuperTypeOf(rhsType)) {
- return lhsType;
- } else if (rhsType.isSuperTypeOf(rhsType)) {
- return rhsType;
+ // TODO: right now all the numeric columns are still doing conversion
b/c even if the inputDataSchema asked for
+ // one of the number type, it might not contain the exact type in the
payload.
+ if (_lhs._resultType == null || _lhs._resultType ==
DataSchema.ColumnDataType.OBJECT
+ || _rhs._resultType == null || _rhs._resultType ==
DataSchema.ColumnDataType.OBJECT) {
+ _requireCasting = false;
+ _commonCastType = null;
+ } else if (_lhs._resultType.isSuperTypeOf(_rhs._resultType)) {
+ _requireCasting = _lhs._resultType != _rhs._resultType ||
_lhs._resultType.isNumber();
+ _commonCastType = _lhs._resultType;
+ } else if (_rhs._resultType.isSuperTypeOf(_lhs._resultType)) {
+ _requireCasting = _lhs._resultType != _rhs._resultType ||
_rhs._resultType.isNumber();
+ _commonCastType = _rhs._resultType;
} else {
- return null;
+ _requireCasting = false;
+ _commonCastType = null;
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
index 029f5ddaad..b5852bad11 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
@@ -27,7 +27,7 @@ import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.function.FunctionUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
-import org.apache.pinot.query.runtime.operator.OperatorUtils;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
/*
* FunctionOperands are generated from {@link RexExpression}s.
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
index 7618f74aea..0fc17ba407 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
@@ -18,19 +18,24 @@
*/
package org.apache.pinot.query.runtime.operator.operands;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils;
+import org.apache.pinot.query.runtime.operator.utils.OperatorUtils;
public abstract class TransformOperand {
protected String _resultName;
protected DataSchema.ColumnDataType _resultType;
- public static TransformOperand toTransformOperand(RexExpression
rexExpression, DataSchema dataSchema) {
+ public static TransformOperand toTransformOperand(RexExpression
rexExpression, DataSchema inputDataSchema) {
if (rexExpression instanceof RexExpression.InputRef) {
- return new ReferenceOperand((RexExpression.InputRef) rexExpression,
dataSchema);
+ return new ReferenceOperand((RexExpression.InputRef) rexExpression,
inputDataSchema);
} else if (rexExpression instanceof RexExpression.FunctionCall) {
- return new FunctionOperand((RexExpression.FunctionCall) rexExpression,
dataSchema);
+ return toTransformOperand((RexExpression.FunctionCall) rexExpression,
inputDataSchema);
} else if (rexExpression instanceof RexExpression.Literal) {
return new LiteralOperand((RexExpression.Literal) rexExpression);
} else {
@@ -38,6 +43,98 @@ public abstract class TransformOperand {
}
}
+ @SuppressWarnings({"ConstantConditions", "rawtypes", "unchecked"})
+ private static TransformOperand
toTransformOperand(RexExpression.FunctionCall functionCall,
+ DataSchema inputDataSchema) {
+ final List<RexExpression> functionOperands =
functionCall.getFunctionOperands();
+ int operandSize = functionOperands.size();
+ switch
(OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
+ case "AND":
+ Preconditions.checkState(operandSize >= 2, "AND takes >=2 argument,
passed in argument size:" + operandSize);
+ return new FilterOperand.And(functionOperands, inputDataSchema);
+ case "OR":
+ Preconditions.checkState(operandSize >= 2, "OR takes >=2 argument,
passed in argument size:" + operandSize);
+ return new FilterOperand.Or(functionOperands, inputDataSchema);
+ case "NOT":
+ Preconditions.checkState(operandSize == 1, "NOT takes one argument,
passed in argument size:" + operandSize);
+ return new FilterOperand.Not(functionOperands.get(0), inputDataSchema);
+ case "equals":
+ return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+ @Override
+ public Boolean apply(Object[] row) {
+ if (_requireCasting) {
+ return ((Comparable)
FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+ FunctionInvokeUtils.convert(_rhs.apply(row),
_commonCastType)) == 0;
+ } else {
+ return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row))
== 0;
+ }
+ }
+ };
+ case "notEquals":
+ return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+ @Override
+ public Boolean apply(Object[] row) {
+ if (_requireCasting) {
+ return ((Comparable)
FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+ FunctionInvokeUtils.convert(_rhs.apply(row),
_commonCastType)) != 0;
+ } else {
+ return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row))
!= 0;
+ }
+ }
+ };
+ case "greaterThan":
+ return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+ @Override
+ public Boolean apply(Object[] row) {
+ if (_requireCasting) {
+ return ((Comparable)
FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+ FunctionInvokeUtils.convert(_rhs.apply(row),
_commonCastType)) > 0;
+ } else {
+ return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row))
> 0;
+ }
+ }
+ };
+ case "greaterThanOrEqual":
+ return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+ @Override
+ public Boolean apply(Object[] row) {
+ if (_requireCasting) {
+ return ((Comparable)
FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+ FunctionInvokeUtils.convert(_rhs.apply(row),
_commonCastType)) >= 0;
+ } else {
+ return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row))
>= 0;
+ }
+ }
+ };
+ case "lessThan":
+ return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+ @Override
+ public Boolean apply(Object[] row) {
+ if (_requireCasting) {
+ return ((Comparable)
FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+ FunctionInvokeUtils.convert(_rhs.apply(row),
_commonCastType)) < 0;
+ } else {
+ return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row))
< 0;
+ }
+ }
+ };
+ case "lessThanOrEqual":
+ return new FilterOperand.Predicate(functionOperands, inputDataSchema) {
+ @Override
+ public Boolean apply(Object[] row) {
+ if (_requireCasting) {
+ return ((Comparable)
FunctionInvokeUtils.convert(_lhs.apply(row), _commonCastType)).compareTo(
+ FunctionInvokeUtils.convert(_rhs.apply(row),
_commonCastType)) <= 0;
+ } else {
+ return ((Comparable) _lhs.apply(row)).compareTo(_rhs.apply(row))
<= 0;
+ }
+ }
+ };
+ default:
+ return new FunctionOperand(functionCall, inputDataSchema);
+ }
+ }
+
public String getResultName() {
return _resultName;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/FunctionInvokeUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/FunctionInvokeUtils.java
new file mode 100644
index 0000000000..de26cdfab4
--- /dev/null
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/FunctionInvokeUtils.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.utils;
+
+import org.apache.pinot.common.utils.DataSchema;
+
+
+public class FunctionInvokeUtils {
+
+ private FunctionInvokeUtils() {
+ // do not instantiate.
+ }
+
+ /**
+ * Convert result to the appropriate column data type according to the
desired {@link DataSchema.ColumnDataType}
+ * of the {@link org.apache.pinot.core.common.Operator}.
+ *
+ * @param inputObj input entry
+ * @param columnDataType desired column data type
+ * @return converted entry
+ */
+ public static Object convert(Object inputObj, DataSchema.ColumnDataType
columnDataType) {
+ if (columnDataType.isNumber() && columnDataType !=
DataSchema.ColumnDataType.BIG_DECIMAL) {
+ return inputObj == null ? null : columnDataType.convert(inputObj);
+ } else {
+ return inputObj;
+ }
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
similarity index 97%
rename from
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
rename to
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
index 2bad078e3f..a5042c99bb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/OperatorUtils.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.query.runtime.operator;
+package org.apache.pinot.query.runtime.operator.utils;
import java.util.HashMap;
import java.util.Map;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
index 7b1f5bce5f..1e78233234 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java
@@ -130,7 +130,7 @@ public class FilterOperatorTest {
Assert.assertTrue(result.isEmpty());
}
- @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*boolean literal.*")
+ @Test
public void shouldThrowOnNonBooleanTypeBooleanLiteral() {
RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.STRING, "false");
DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
@@ -139,10 +139,13 @@ public class FilterOperatorTest {
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock errorBlock = op.getNextBlock();
+ Assert.assertTrue(errorBlock.isErrorBlock());
+ DataBlock data = errorBlock.getDataBlock();
+
Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("cast"));
}
- @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*Input has to be "
- + "boolean type.*")
+ @Test
public void shouldThrowOnNonBooleanTypeInputRef() {
RexExpression ref0 = new RexExpression.InputRef(0);
DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
@@ -151,6 +154,10 @@ public class FilterOperatorTest {
Mockito.when(_upstreamOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref0);
+ TransferableBlock errorBlock = op.getNextBlock();
+ Assert.assertTrue(errorBlock.isErrorBlock());
+ DataBlock data = errorBlock.getDataBlock();
+
Assert.assertTrue(data.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("cast"));
}
@Test
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
index 868ec3b9b0..9c6370b5f5 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java
@@ -194,8 +194,8 @@ public class TransformOperatorTest {
}));
RexExpression.Literal boolLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
RexExpression.Literal strLiteral = new
RexExpression.Literal(FieldSpec.DataType.STRING, "str");
- DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT,
DataSchema.ColumnDataType.STRING});
+ DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN,
DataSchema.ColumnDataType.STRING});
TransformOperator op =
new TransformOperator(_upstreamOp, resultSchema,
ImmutableList.of(boolLiteral, strLiteral), upStreamSchema);
TransferableBlock result = op.nextBlock();
diff --git a/pinot-query-runtime/src/test/resources/queries/TypeCasting.json
b/pinot-query-runtime/src/test/resources/queries/TypeCasting.json
new file mode 100644
index 0000000000..f134cfd4b2
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/TypeCasting.json
@@ -0,0 +1,163 @@
+{
+ "explicit_cast": {
+ "comment": "explicit type casting for operations",
+ "tables": {
+ "tbl": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "longCol", "type": "LONG"},
+ {"name": "floatCol", "type": "FLOAT"},
+ {"name": "doubleCol", "type": "DOUBLE"},
+ {"name": "bigDecimalCol", "type": "BIG_DECIMAL"},
+ {"name": "boolCol", "type": "BOOLEAN"},
+ {"name": "timestampCol", "type": "TIMESTAMP"},
+ {"name": "stringCol", "type": "STRING"},
+ {"name": "bytesCol", "type": "BYTES"}
+ ],
+ "inputs": [
+ [1, 14, 3.0, 5.176518e16, "1e505", true, "1970-01-01 01:02:03.456",
"lyons", "DEADBEEF"],
+ [2, 21, 4.0, 4.608155e11, "1e350", true, "1999-01-08 22:05:46",
"onan", "DE12BEEF"],
+ [3, 14, 5.0, 1.249261e11, "1e505", false, "1999-01-08 04:05:06.001",
"rudvalis", "A000"],
+ [4, 21, 5.0, 8.677557e19, "1e404", false, "5760-01-01 04:05:06",
"janko", "FEEE"],
+ [1, 41, 2.0, 4.154786e33, "1e505", true, "2022-01-02 03:45:00",
"baby", "1000"],
+ [2, 46, 1.0, 8.080171e53, "1e401", false, "1969-12-31 23:59:59",
"monster", "00"]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "all types should conform when directly selected out",
+ "sql": "SELECT * FROM {tbl}"
+ },
+ {
+ "description": "all types should conform when directly selected out &
transfer between stages",
+ "sql": "SELECT * FROM {tbl} AS a JOIN {tbl} AS b ON a.intCol =
b.intCol WHERE a.boolCol = true AND b.boolCol = false"
+ },
+ { "sql": "SELECT CAST(floatCol AS DOUBLE) * 1e100, CAST(intCol AS
BIGINT) * 2000000000, CAST(longCol AS DOUBLE) * 1e100, CAST(boolCol AS INT)
FROM {tbl}" },
+ { "sql": "SELECT CAST(a.floatCol AS DOUBLE) * 1e100, CAST(a.intCol AS
BIGINT) * 2000000000, CAST(b.longCol AS FLOAT) * 1e20, CAST(a.boolCol AS INT)
FROM {tbl} AS a JOIN {tbl} AS b ON a.intCol = b.intCol WHERE a.boolCol = true"
},
+ {
+ "ignored": true,
+ "comments": "primitive cast not work: cast as DECIMAL doesn't work",
+ "sql": "SELECT CAST(doubleCol AS DECIMAL) FROM {tbl} WHERE
bigDecimalCol > 0 AND CAST(bytesCol AS VARCHAR) != '1000'"
+ },
+ {
+ "ignored": true,
+ "comments": "special cast not work: timestamp cast not supported,
varchar cast not supported",
+ "sql": "SELECT CAST(b.timestampCol AS BIGINT), CAST(stringCol AS
VARBINARY) FROM {tbl}"
+ }
+ ]
+ },
+ "function_operand_casting": {
+ "comment": "built-in function from SqlStdOperatorTable with
implicit/explicit casting",
+ "tables": {
+ "tbl": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "longCol", "type": "LONG"},
+ {"name": "floatCol", "type": "FLOAT"},
+ {"name": "doubleCol", "type": "DOUBLE"},
+ {"name": "bigDecimalCol", "type": "BIG_DECIMAL"},
+ {"name": "boolCol", "type": "BOOLEAN"},
+ {"name": "timestampCol", "type": "TIMESTAMP"},
+ {"name": "stringCol", "type": "STRING"},
+ {"name": "bytesCol", "type": "BYTES"}
+ ],
+ "inputs": [
+ [1, 14, 3.0, 5.176518e16, "1e505", true, "1970-01-01 01:02:03.456",
"lyons", "DEADBEEF"],
+ [2, 21, 4.0, 4.608155e11, "1e350", true, "1999-01-08 22:05:46",
"onan", "DE12BEEF"],
+ [3, 14, 5.0, 1.249261e11, "1e505", false, "1999-01-08 04:05:06.001",
"rudvalis", "A000"],
+ [4, 21, 5.0, 8.677557e19, "1e404", false, "5760-01-01 04:05:06",
"janko", "FEEE"],
+ [1, 41, 2.0, 4.154786e33, "1e505", true, "2022-01-02 03:45:00",
"baby", "1000"],
+ [2, 46, 1.0, 8.080171e53, "1e401", false, "1969-12-31 23:59:59",
"monster", "00"]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "test (1) SqlStdOperator with Pinot ScalarFunction
(UPPER, SQRT); (2) SqlStdOperator with Pinot ScalarFunction multiple argument
(POWER); (3) SqlStdOperator with Pinot Filter (AND, =), (4) SqlStdOperator with
Pinot operator (+) ",
+ "sql": "SELECT UPPER(stringCol), POWER(intCol, 2), SQRT(longCol),
boolCol AND (intCol = 1), floatCol + 10 FROM {tbl}"
+ },
+ {
+ "description": "test SqlStdOperator with 4 Pinot variances above, but
also mixed in with intermediate stage transfer",
+ "sql": "SELECT LOWER(a.stringCol), POWER(a.longCol, 3),
SQRT(b.intCol), a.boolCol AND (b.intCol = 1), a.floatCol + b.doubleCol FROM
{tbl} AS a JOIN {tbl} AS b ON a.intCol = b.intCol"
+ }
+ ]
+ },
+ "udf_argument_casting": {
+ "comment": "user-defined (or @ScalarFunction annotated) function with
implicit/explicit casting",
+ "tables": {
+ "tbl": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "longCol", "type": "LONG"},
+ {"name": "floatCol", "type": "FLOAT"},
+ {"name": "doubleCol", "type": "DOUBLE"},
+ {"name": "bigDecimalCol", "type": "BIG_DECIMAL"},
+ {"name": "boolCol", "type": "BOOLEAN"},
+ {"name": "timestampCol", "type": "TIMESTAMP"},
+ {"name": "timestampStringCol", "type": "TIMESTAMP"},
+ {"name": "stringCol", "type": "STRING"},
+ {"name": "bytesCol", "type": "BYTES"}
+ ],
+ "inputs": [
+ [1, 14, 3.0, 5.176518e16, "1e505", true, "1970-01-01 01:02:03.456",
"123", "lyons", "DEADBEEF"],
+ [2, 21, 4.0, 4.608155e11, "1e350", true, "1999-01-08 22:05:46",
"123", "onan", "DE12BEEF"],
+ [3, 14, 5.0, 1.249261e11, "1e505", false, "1999-01-08 04:05:06.001",
"123", "rudvalis", "A000"],
+ [4, 21, 5.0, 8.677557e19, "1e404", false, "5760-01-01 04:05:06",
"123", "janko", "FEEE"],
+ [1, 41, 2.0, 4.154786e33, "1e505", true, "2022-01-02 03:45:00",
"123", "baby", "1000"],
+ [2, 46, 1.0, 8.080171e53, "1e401", false, "1969-12-31 23:59:59",
"123", "monster", "00"]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "test (1) Pinot ScalarFunction (md5); (2) Pinot
ScalarFunction multiple argument (dateTrunc, substr); (3) Pinot filter
(regexpLike), (4) Pinot transform (dateTimeConvert) ",
+ "sql": "SELECT md5(bytesCol), substr(stringCol, 5),
regexpExtract(stringCol, '([\\w]+).*') FROM {tbl} WHERE regexpLike(stringCol,
'.*')",
+ "outputs": [
+ ["2f249230a8e7c2bf6005ccd2679259ec", "", "lyons"],
+ ["a85a5fd494d9a538e22b696159931c1b", "", "onan"],
+ ["982569213f522d8fce898806d0a2c357", "lis", "rudvalis"],
+ ["ae04dbae988ab45ebfba84c0c3612a50", "", "janko"],
+ ["d479436bd32066b25886f9920c7b7ccf", "", "baby"],
+ ["93b885adfe0da089cdf634904fd59f71", "er", "monster"]
+ ]
+ },
+ {
+ "description": "test Pinot function variances above, but also mixed in
with intermediate stage transfer",
+ "sql": "SELECT md5(a.bytesCol), substr(b.stringCol, 5),
regexpExtract(a.stringCol, '([\\w]+).*') FROM {tbl} AS a JOIN {tbl} AS b ON
a.intCol = b.intCol WHERE regexpLike(a.stringCol, b.stringCol)",
+ "outputs": [
+ ["2f249230a8e7c2bf6005ccd2679259ec", "", "lyons"],
+ ["a85a5fd494d9a538e22b696159931c1b", "", "onan"],
+ ["982569213f522d8fce898806d0a2c357", "lis", "rudvalis"],
+ ["ae04dbae988ab45ebfba84c0c3612a50", "", "janko"],
+ ["d479436bd32066b25886f9920c7b7ccf", "", "baby"],
+ ["93b885adfe0da089cdf634904fd59f71", "er", "monster"]
+ ]
+ },
+ {
+ "ignored": true,
+ "comment": "problematic dateTimeConvert with weird exception on leaf
stage",
+ "sql": "SELECT dateTimeConvert(timestampStringCol,
'1:MILLISECONDS:EPOCH', '1:MINUTES:EPOCH', '30:MINUTES') FROM {tbl}",
+ "outputs": [
+ []
+ ]
+ },
+ {
+ "ignored": true,
+ "comment": "problematic round function not producing proper decimal
results type",
+ "sql": "SELECT ROUND(longCol, 2) FROM {tbl}",
+ "outputs": [
+ []
+ ]
+ },
+ {
+ "ignored": true,
+ "comment": "dateTrunc returns round up instead of round down results",
+ "sql": "SELECT dateTrunc('DAY', b.timestampCol) FROM {tbl}",
+ "outputs": [
+ []
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]