agavra commented on code in PR #9792:
URL: https://github.com/apache/pinot/pull/9792#discussion_r1021928824


##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java:
##########
@@ -0,0 +1,212 @@
+
+
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+  private AutoCloseable _mocks;
+  @Mock
+  private Operator<TransferableBlock> _upstreamOperator;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testUpstreamErrorBlock() {

Review Comment:
   suggestion (feel free to ignore) - test names that follow "shouldXXX" are 
generally more informative than "testXXX". for example this should be 
`shouldPropagateUpstreamErrorBlock` which explains exactly what the behavior 
should be, not just that you're testing some behavior



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java:
##########
@@ -41,12 +41,10 @@
  * This class is non-threadsafe. Do not reuse the stage planner for multiple 
query plans.
  */
 public class StagePlanner {
-  private final PlannerContext _plannerContext;
   private final WorkerManager _workerManager;
   private int _stageIdCounter;
 
   public StagePlanner(PlannerContext plannerContext, WorkerManager 
workerManager) {
-    _plannerContext = plannerContext;

Review Comment:
   @walterddr specifically asked to keep this here in one of my previous PRs



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java:
##########
@@ -41,22 +41,24 @@ public static FilterOperand toFilterOperand(RexExpression 
rexExpression, DataSch
     }
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.Literal literal) {
+  private static FilterOperand toFilterOperand(RexExpression.Literal literal) {
     return new BooleanLiteral(literal);
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, 
DataSchema dataSchema) {
+  private static FilterOperand toFilterOperand(RexExpression.InputRef 
inputRef, DataSchema dataSchema) {
     return new BooleanInputRef(inputRef, dataSchema);
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.FunctionCall 
functionCall, DataSchema dataSchema) {
-
+  private static FilterOperand toFilterOperand(RexExpression.FunctionCall 
functionCall, DataSchema dataSchema) {
     switch 
(OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
       case "AND":
+        Preconditions.checkState(functionCall.getFunctionOperands().size() 
>=2, "AND takes >=2 argument");

Review Comment:
   tip: you can run `mvn checkstyle:check -T4` to run the linter - that would 
catch things like `>=2` which I think need to be `>= 2`



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java:
##########
@@ -0,0 +1,212 @@
+
+
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+  private AutoCloseable _mocks;
+  @Mock
+  private Operator<TransferableBlock> _upstreamOperator;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testUpstreamErrorBlock() {
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("filterError")));
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN
+    });
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    TransferableBlock errorBlock = op.getNextBlock();
+    Assert.assertTrue(errorBlock.isErrorBlock());
+    DataBlock error = errorBlock.getDataBlock();
+    
Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
+  }
+
+  @Test
+  public void testUpstreamEos() {

Review Comment:
   let's also make sure noop blocks are properly handled



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java:
##########
@@ -0,0 +1,212 @@
+
+
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+  private AutoCloseable _mocks;
+  @Mock
+  private Operator<TransferableBlock> _upstreamOperator;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testUpstreamErrorBlock() {
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("filterError")));
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN
+    });
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    TransferableBlock errorBlock = op.getNextBlock();
+    Assert.assertTrue(errorBlock.isErrorBlock());
+    DataBlock error = errorBlock.getDataBlock();
+    
Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
+  }
+
+  @Test
+  public void testUpstreamEos() {
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema,
 new Object[]{0}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0)[0], 0);

Review Comment:
   (suggestion) ideally tests test only one thing - we don't need these since 
these tests are covered below



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java:
##########
@@ -41,22 +41,24 @@ public static FilterOperand toFilterOperand(RexExpression 
rexExpression, DataSch
     }
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.Literal literal) {
+  private static FilterOperand toFilterOperand(RexExpression.Literal literal) {
     return new BooleanLiteral(literal);
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.InputRef inputRef, 
DataSchema dataSchema) {
+  private static FilterOperand toFilterOperand(RexExpression.InputRef 
inputRef, DataSchema dataSchema) {
     return new BooleanInputRef(inputRef, dataSchema);
   }
 
-  public static FilterOperand toFilterOperand(RexExpression.FunctionCall 
functionCall, DataSchema dataSchema) {
-
+  private static FilterOperand toFilterOperand(RexExpression.FunctionCall 
functionCall, DataSchema dataSchema) {
     switch 
(OperatorUtils.canonicalizeFunctionName(functionCall.getFunctionName())) {
       case "AND":
+        Preconditions.checkState(functionCall.getFunctionOperands().size() 
>=2, "AND takes >=2 argument");

Review Comment:
   nit: useful (for all of these error messages) to include what was passed in 
instead of only what's expected (that'll help us debug situations where we 
thought we're passing in the right thing)



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java:
##########
@@ -0,0 +1,212 @@
+
+
+package org.apache.pinot.query.runtime.operator;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.query.planner.logical.RexExpression;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class FilterOperatorTest {
+  private AutoCloseable _mocks;
+  @Mock
+  private Operator<TransferableBlock> _upstreamOperator;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void testUpstreamErrorBlock() {
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("filterError")));
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+    DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.BOOLEAN
+    });
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    TransferableBlock errorBlock = op.getNextBlock();
+    Assert.assertTrue(errorBlock.isErrorBlock());
+    DataBlock error = errorBlock.getDataBlock();
+    
Assert.assertTrue(error.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE).contains("filterError"));
+  }
+
+  @Test
+  public void testUpstreamEos() {
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    
Mockito.when(_upstreamOperator.nextBlock()).thenReturn(OperatorTestUtil.block(inputSchema,
 new Object[]{0}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0)[0], 0);
+    dataBlock = op.getNextBlock();
+    Assert.assertTrue(dataBlock.isEndOfStreamBlock());
+  }
+
+  @Test
+  public void testTrueBooleanFilter() {
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new 
Object[]{1}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 2);
+    Assert.assertEquals(result.get(0)[0], 0);
+    Assert.assertEquals(result.get(1)[0], 1);
+  }
+
+  @Test
+  public void testFalseBooleanFilter() {
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.BOOLEAN, false);
+
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new 
Object[]{2}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertTrue(result.isEmpty());
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*boolean literal.*")
+  public void testBadTypeLiteralFilter() {
+    RexExpression booleanLiteral = new 
RexExpression.Literal(FieldSpec.DataType.STRING, "false");
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new 
Object[]{2}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
booleanLiteral);
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*Input has to be "
+      + "boolean type.*")
+  public void testInputRefFilterBadType() {
+    RexExpression ref0 = new RexExpression.InputRef(0);
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol"}, new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new 
Object[]{2}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
ref0);
+  }
+
+  @Test
+  public void testInputRefFilter() {
+    RexExpression ref1 = new RexExpression.InputRef(1);
+    DataSchema inputSchema = new DataSchema(new String[]{"intCol", "boolCol"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.BOOLEAN
+    });
+    Mockito.when(_upstreamOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, 
new Object[]{2, false}));
+    FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, 
ref1);
+    TransferableBlock dataBlock = op.getNextBlock();
+    Assert.assertFalse(dataBlock.isErrorBlock());
+    List<Object[]> result = dataBlock.getContainer();
+    Assert.assertEquals(result.size(), 1);
+    Assert.assertEquals(result.get(0)[0], 1);
+    Assert.assertEquals(result.get(0)[1], true);
+  }
+
+  @Test
+  public void testAndFilter() {

Review Comment:
   suggestion: instead of testing the entire `FilterOperator`, why not make 
each of the operand classes (e.g. `AndOperand`) package private so that we can 
test only that specific code? (I believe that's what you are testing here - and 
that also makes sure that anywhere it's reused also has coverage for it, such 
as `HashJoinOperator`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to