agavra commented on code in PR #9792:
URL: https://github.com/apache/pinot/pull/9792#discussion_r1022150683
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java:
##########
@@ -0,0 +1,212 @@
+
+
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+ private AutoCloseable _mocks;
+ @Mock
+ private Operator<TransferableBlock> _upstreamOperator;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void testUpstreamErrorBlock() {
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("filterError")));
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+ DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.BOOLEAN
+ });
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock errorBlock = op.getNextBlock();
+ Assert.assertTrue(errorBlock.isErrorBlock());
+ DataBlock error = errorBlock.getDataBlock();
+
Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
+ }
+
+ @Test
+ public void testUpstreamEos() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema,
new Object[]{0}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], 0);
+ dataBlock = op.getNextBlock();
+ Assert.assertTrue(dataBlock.isEndOfStreamBlock());
+ }
+
+ @Test
+ public void testTrueBooleanFilter() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new
Object[]{1}))
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 2);
+ Assert.assertEquals(result.get(0)[0], 0);
+ Assert.assertEquals(result.get(1)[0], 1);
+ }
+
+ @Test
+ public void testFalseBooleanFilter() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, false);
+
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertTrue(result.isEmpty());
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*boolean literal.*")
+ public void testBadTypeLiteralFilter() {
+ RexExpression booleanLiteral = new
RexExpression.Literal(FieldSpec.DataType.STRING, "false");
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
booleanLiteral);
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*Input has to be "
+ + "boolean type.*")
+ public void testInputRefFilterBadType() {
+ RexExpression ref0 = new RexExpression.InputRef(0);
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new
DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new
Object[]{2}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref0);
+ }
+
+ @Test
+ public void testInputRefFilter() {
+ RexExpression ref1 = new RexExpression.InputRef(1);
+ DataSchema inputSchema = new DataSchema(new String[]{"intCol", "boolCol"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.BOOLEAN
+ });
+ Mockito.when(_upstreamOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true},
new Object[]{2, false}));
+ FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema,
ref1);
+ TransferableBlock dataBlock = op.getNextBlock();
+ Assert.assertFalse(dataBlock.isErrorBlock());
+ List<Object[]> result = dataBlock.getContainer();
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.get(0)[0], 1);
+ Assert.assertEquals(result.get(0)[1], true);
+ }
+
+ @Test
+ public void testAndFilter() {
Review Comment:
for the sake of argument (I'm happy to leave it as is) `I feel
FilterOperator is our end API` is where I think we disagree 😉
I feel that "our end API" is SQL. `FilterOperator` has only one job/API:
take rows and pass them to the underlying operator. If it does that, then it
did its job correctly. Similarly, each operand should do its job correctly -
the API for an operand is to take a row and determine whether or not that row
should be included.
To make sure they work together when a SQL query is issued is an integration
test, and that should be tested at the top level (via the new JSON query runner
tests).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]