walterddr commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018240339


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
  * If the input is single value, the output type will be input type. 
Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }

Review Comment:
   nit: move next to Accumulator? also make it private? i don't foresee anyone 
uses this interface and accumulate is already a private member class



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -18,26 +18,208 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.BaseOperator;
 import org.apache.pinot.query.planner.logical.RexExpression;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
 
 
 public class AggregateOperatorTest {
 
+  private AutoCloseable _mocks;
+
+  @Mock
+  private Operator<TransferableBlock> _input;
+
+  @BeforeMethod
+  public void setUp() {
+    _mocks = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterMethod
+  public void tearDown()
+      throws Exception {
+    _mocks.close();
+  }
+
+  @Test
+  public void shouldHandleUpstreamErrorBlocks() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new 
Exception("foo!")));
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+
+    // When:
+    TransferableBlock block1 = operator.nextBlock(); // build
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block1.isErrorBlock(), "Input errors should propagate 
immediately");
+  }
+
+  @Test
+  public void shouldHandleEndOfStreamBlockWithNoOtherInputs() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+
+    Mockito.when(_input.nextBlock())
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);
+
+    // When:
+    TransferableBlock block = operator.nextBlock();
+
+    // Then:
+    Mockito.verify(_input, Mockito.times(1)).nextBlock();
+    Assert.assertTrue(block.isEndOfStreamBlock(), "EOS blocks should 
propagate");
+  }
+
+  @Test
+  public void shouldHandleUpstreamNoOpBlocksWhileConstructing() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(getSum(new 
RexExpression.InputRef(1)));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
ColumnDataType[]{INT, INT});
+    Mockito.when(_input.nextBlock())
+        .thenReturn(block(inSchema, new Object[]{1, 1}))
+        .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+
+    DataSchema outSchema = new DataSchema(new String[]{"sum"}, new 
ColumnDataType[]{DOUBLE});
+    AggregateOperator operator = new AggregateOperator(_input, outSchema, 
calls, group);

Review Comment:
   nit: these can be private static? 
   good thing is then you can use these to mock the operators. 
   but i am also ok if the goal is to keep these tests isolated and easy to 
maintain. 



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
  * If the input is single value, the output type will be input type. 
Otherwise, the output type will be double.
  */
 public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+  interface Merger extends BiFunction<Object, Object, Object> {
+  }
+
   private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+  private static final Map<String, Merger> MERGERS = ImmutableMap

Review Comment:
   this private static member can be in Accumulators?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to