walterddr commented on code in PR #9767:
URL: https://github.com/apache/pinot/pull/9767#discussion_r1018240339
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
* If the input is single value, the output type will be input type.
Otherwise, the output type will be double.
*/
public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+ interface Merger extends BiFunction<Object, Object, Object> {
+ }
Review Comment:
nit: move next to Accumulator? also make it private? i don't foresee anyone
uses this interface and accumulate is already a private member class
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java:
##########
@@ -18,26 +18,208 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.sql.SqlKind;
+import org.apache.pinot.common.datablock.DataBlock;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.spi.data.FieldSpec;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
public class AggregateOperatorTest {
+ private AutoCloseable _mocks;
+
+ @Mock
+ private Operator<TransferableBlock> _input;
+
+ @BeforeMethod
+ public void setUp() {
+ _mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ throws Exception {
+ _mocks.close();
+ }
+
+ @Test
+ public void shouldHandleUpstreamErrorBlocks() {
+ // Given:
+ List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(1)));
+ List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
+
+ Mockito.when(_input.nextBlock())
+ .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new
Exception("foo!")));
+
+ DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+
+ // When:
+ TransferableBlock block1 = operator.nextBlock(); // build
+
+ // Then:
+ Mockito.verify(_input, Mockito.times(1)).nextBlock();
+ Assert.assertTrue(block1.isErrorBlock(), "Input errors should propagate
immediately");
+ }
+
+ @Test
+ public void shouldHandleEndOfStreamBlockWithNoOtherInputs() {
+ // Given:
+ List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(1)));
+ List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
+
+ Mockito.when(_input.nextBlock())
+ .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+ DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
+
+ // When:
+ TransferableBlock block = operator.nextBlock();
+
+ // Then:
+ Mockito.verify(_input, Mockito.times(1)).nextBlock();
+ Assert.assertTrue(block.isEndOfStreamBlock(), "EOS blocks should
propagate");
+ }
+
+ @Test
+ public void shouldHandleUpstreamNoOpBlocksWhileConstructing() {
+ // Given:
+ List<RexExpression> calls = ImmutableList.of(getSum(new
RexExpression.InputRef(1)));
+ List<RexExpression> group = ImmutableList.of(new
RexExpression.InputRef(0));
+
+ DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new
ColumnDataType[]{INT, INT});
+ Mockito.when(_input.nextBlock())
+ .thenReturn(block(inSchema, new Object[]{1, 1}))
+ .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock());
+
+ DataSchema outSchema = new DataSchema(new String[]{"sum"}, new
ColumnDataType[]{DOUBLE});
+ AggregateOperator operator = new AggregateOperator(_input, outSchema,
calls, group);
Review Comment:
nit: these can be private static?
good thing is then you can use these to mock the operators.
but i am also ok if the goal is to keep these tests isolated and easy to
maintain.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java:
##########
@@ -53,17 +56,32 @@
* If the input is single value, the output type will be input type.
Otherwise, the output type will be double.
*/
public class AggregateOperator extends BaseOperator<TransferableBlock> {
+
+ interface Merger extends BiFunction<Object, Object, Object> {
+ }
+
private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
+ private static final Map<String, Merger> MERGERS = ImmutableMap
Review Comment:
this private static member can be in Accumulators?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]