This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 625330e6fd7 Add distinct UNION set operator (#16570)
625330e6fd7 is described below
commit 625330e6fd7fee654f7da3a90fe42e4c9eed421a
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Aug 14 09:36:22 2025 +0530
Add distinct UNION set operator (#16570)
---
.../pinot/query/runtime/operator/SetOperator.java | 82 ++++++----
.../{UnionOperator.java => UnionAllOperator.java} | 43 ++---
.../query/runtime/operator/UnionOperator.java | 88 ++++++-----
.../query/runtime/plan/PlanNodeToOpChain.java | 5 +-
.../runtime/operator/IntersectOperatorTest.java | 45 ++++++
.../query/runtime/operator/MinusOperatorTest.java | 45 ++++++
...OperatorTest.java => UnionAllOperatorTest.java} | 82 ++++++----
.../query/runtime/operator/UnionOperatorTest.java | 54 ++++++-
.../runtime/queries/ResourceBasedQueriesTest.java | 21 ++-
.../src/test/resources/queries/SetOps.json | 51 ------
.../src/test/resources/queries/SetOpsH2.json | 85 ++++++++++
.../src/test/resources/queries/SetOpsNonH2.json | 175 +++++++++++++++++++++
12 files changed, 577 insertions(+), 199 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
index 8b051eae867..ae5e0a5471f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import java.util.ArrayList;
@@ -43,24 +44,23 @@ import org.apache.pinot.segment.spi.IndexSegment;
public abstract class SetOperator extends MultiStageOperator {
protected final Multiset<Record> _rightRowSet;
- private final List<MultiStageOperator> _inputOperators;
- private final MultiStageOperator _leftChildOperator;
- private final MultiStageOperator _rightChildOperator;
- private final DataSchema _dataSchema;
+ protected final MultiStageOperator _leftChildOperator;
+ protected final MultiStageOperator _rightChildOperator;
+ protected final DataSchema _dataSchema;
- private boolean _isRightSetBuilt;
- protected MseBlock.Eos _eos;
- protected final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
+ private boolean _isRightChildOperatorProcessed;
+ private MseBlock.Eos _eos;
+ private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
public SetOperator(OpChainExecutionContext opChainExecutionContext,
List<MultiStageOperator> inputOperators,
DataSchema dataSchema) {
super(opChainExecutionContext);
_dataSchema = dataSchema;
- _inputOperators = inputOperators;
- _leftChildOperator = getChildOperators().get(0);
- _rightChildOperator = getChildOperators().get(1);
+ Preconditions.checkState(inputOperators.size() == 2, "Set operator should
have 2 child operators");
+ _leftChildOperator = inputOperators.get(0);
+ _rightChildOperator = inputOperators.get(1);
_rightRowSet = HashMultiset.create();
- _isRightSetBuilt = false;
+ _isRightChildOperatorProcessed = false;
}
@Override
@@ -71,7 +71,7 @@ public abstract class SetOperator extends MultiStageOperator {
@Override
public List<MultiStageOperator> getChildOperators() {
- return _inputOperators;
+ return List.of(_leftChildOperator, _rightChildOperator);
}
@Override
@@ -96,17 +96,40 @@ public abstract class SetOperator extends
MultiStageOperator {
@Override
protected MseBlock getNextBlock() {
- if (!_isRightSetBuilt) {
- // construct a SET with all the right side rows.
- constructRightBlockSet();
- }
if (_eos != null) {
return _eos;
}
- return constructResultBlockSet();
+
+ if (!_isRightChildOperatorProcessed) {
+ MseBlock mseBlock = processRightOperator();
+
+ if (mseBlock.isData()) {
+ return mseBlock;
+ } else if (mseBlock.isError()) {
+ _eos = (MseBlock.Eos) mseBlock;
+ return _eos;
+ } else if (mseBlock.isSuccess()) {
+ // If it's a regular EOS block, we continue to process the left child
operator.
+ _isRightChildOperatorProcessed = true;
+ }
+ }
+
+ MseBlock mseBlock = processLeftOperator();
+ if (mseBlock.isEos()) {
+ _eos = (MseBlock.Eos) mseBlock;
+ return _eos;
+ } else {
+ return mseBlock;
+ }
}
- protected void constructRightBlockSet() {
+ /**
+ * Processes the right child operator and build the set of rows that can be
used to filter the left child. This method
+ * can be overridden to also be able to return data blocks while processing
the right operator.
+ *
+ * @return either a data block containing rows or an EoS block, never {@code
null}.
+ */
+ protected MseBlock processRightOperator() {
MseBlock block = _rightChildOperator.nextBlock();
while (block.isData()) {
MseBlock.Data dataBlock = (MseBlock.Data) block;
@@ -116,23 +139,22 @@ public abstract class SetOperator extends
MultiStageOperator {
sampleAndCheckInterruption();
block = _rightChildOperator.nextBlock();
}
- MseBlock.Eos eosBlock = (MseBlock.Eos) block;
- if (eosBlock.isError()) {
- _eos = eosBlock;
- } else {
- _isRightSetBuilt = true;
- }
+ assert block.isEos();
+ return block;
}
- protected MseBlock constructResultBlockSet() {
+ /**
+ * Processes the left child operator and returns blocks of rows that match
the criteria defined by the set operation.
+ *
+ * @return block containing matched rows or EoS, never {@code null}.
+ */
+ protected MseBlock processLeftOperator() {
// Keep reading the input blocks until we find a match row or all blocks
are processed.
// TODO: Consider batching the rows to improve performance.
while (true) {
MseBlock leftBlock = _leftChildOperator.nextBlock();
if (leftBlock.isEos()) {
- MseBlock.Eos eosBlock = (MseBlock.Eos) leftBlock;
- _eos = eosBlock;
- return eosBlock;
+ return leftBlock;
}
MseBlock.Data dataBlock = (MseBlock.Data) leftBlock;
List<Object[]> rows = new ArrayList<>();
@@ -155,8 +177,10 @@ public abstract class SetOperator extends
MultiStageOperator {
/**
* Returns true if the row is matched.
+ * <p>
* Also updates the right row set based on the Operator.
- * @param row
+ *
+ * @param row the row to be checked for matching.
* @return true if the row is matched.
*/
protected abstract boolean handleRowMatched(Object[] row);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionAllOperator.java
similarity index 55%
copy from
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
copy to
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionAllOperator.java
index d7965aa39ac..a3237968939 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionAllOperator.java
@@ -19,12 +19,8 @@
package org.apache.pinot.query.runtime.operator;
import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.runtime.blocks.MseBlock;
-import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
-import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,14 +29,11 @@ import org.slf4j.LoggerFactory;
/**
* Union operator for UNION ALL queries.
*/
-public class UnionOperator extends SetOperator {
- private static final Logger LOGGER =
LoggerFactory.getLogger(UnionOperator.class);
- private static final String EXPLAIN_NAME = "UNION";
- @Nullable
- private MultiStageQueryStats _queryStats = null;
- private int _finishedChildren = 0;
+public class UnionAllOperator extends SetOperator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(UnionAllOperator.class);
+ private static final String EXPLAIN_NAME = "UNION_ALL";
- public UnionOperator(OpChainExecutionContext opChainExecutionContext,
List<MultiStageOperator> inputOperators,
+ public UnionAllOperator(OpChainExecutionContext opChainExecutionContext,
List<MultiStageOperator> inputOperators,
DataSchema dataSchema) {
super(opChainExecutionContext, inputOperators, dataSchema);
}
@@ -61,35 +54,17 @@ public class UnionOperator extends SetOperator {
}
@Override
- protected MseBlock getNextBlock() {
- if (_eos != null) {
- return _eos;
- }
- List<MultiStageOperator> childOperators = getChildOperators();
- for (int i = _finishedChildren; i < childOperators.size(); i++) {
- MultiStageOperator upstreamOperator = childOperators.get(i);
- MseBlock block = upstreamOperator.nextBlock();
- if (block.isData()) {
- return block;
- }
- MseBlock.Eos eosBlock = (MseBlock.Eos) block;
- if (eosBlock.isSuccess()) {
- _finishedChildren++;
- } else {
- _eos = eosBlock;
- return block;
- }
- }
- return SuccessMseBlock.INSTANCE;
+ protected MseBlock processRightOperator() {
+ return _rightChildOperator.nextBlock();
}
@Override
- protected StatMap<?> copyStatMaps() {
- return new StatMap<>(_statMap);
+ protected MseBlock processLeftOperator() {
+ return _leftChildOperator.nextBlock();
}
@Override
protected boolean handleRowMatched(Object[] row) {
- throw new UnsupportedOperationException("Union operator does not support
row matching");
+ throw new UnsupportedOperationException("UNION ALL operator does not
support row matching");
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
index d7965aa39ac..db43853d56c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
@@ -18,78 +18,82 @@
*/
package org.apache.pinot.query.runtime.operator;
+import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.query.runtime.blocks.MseBlock;
-import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
-import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Union operator for UNION ALL queries.
+ * Union operator for UNION queries. Unlike {@link UnionAllOperator}, this
operator removes duplicate rows and only
+ * returns distinct rows.
*/
public class UnionOperator extends SetOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(UnionOperator.class);
private static final String EXPLAIN_NAME = "UNION";
- @Nullable
- private MultiStageQueryStats _queryStats = null;
- private int _finishedChildren = 0;
- public UnionOperator(OpChainExecutionContext opChainExecutionContext,
List<MultiStageOperator> inputOperators,
- DataSchema dataSchema) {
+ public UnionOperator(OpChainExecutionContext opChainExecutionContext,
+ List<MultiStageOperator> inputOperators, DataSchema dataSchema) {
super(opChainExecutionContext, inputOperators, dataSchema);
}
@Override
- protected Logger logger() {
- return LOGGER;
- }
-
- @Override
- public Type getOperatorType() {
- return Type.UNION;
+ protected MseBlock processRightOperator() {
+ MseBlock block = _rightChildOperator.nextBlock();
+ while (block.isData()) {
+ MseBlock.Data dataBlock = (MseBlock.Data) block;
+ List<Object[]> rows = new ArrayList<>();
+ for (Object[] row : dataBlock.asRowHeap().getRows()) {
+ Record record = new Record(row);
+ if (!_rightRowSet.contains(record)) {
+ // Add a new unique row.
+ rows.add(row);
+ _rightRowSet.add(record);
+ }
+ }
+ sampleAndCheckInterruption();
+ // If we have collected some rows, return them as a new block.
+ if (!rows.isEmpty()) {
+ return new RowHeapDataBlock(rows, _dataSchema);
+ } else {
+ block = _rightChildOperator.nextBlock();
+ }
+ }
+ assert block.isEos();
+ return block;
}
@Override
- public String toExplainString() {
- return EXPLAIN_NAME;
+ protected boolean handleRowMatched(Object[] row) {
+ if (!_rightRowSet.contains(new Record(row))) {
+ // Row is unique, add it to the result and also to the row set to skip
later duplicates.
+ _rightRowSet.add(new Record(row));
+ return true;
+ } else {
+ // Row is a duplicate, skip it.
+ return false;
+ }
}
@Override
- protected MseBlock getNextBlock() {
- if (_eos != null) {
- return _eos;
- }
- List<MultiStageOperator> childOperators = getChildOperators();
- for (int i = _finishedChildren; i < childOperators.size(); i++) {
- MultiStageOperator upstreamOperator = childOperators.get(i);
- MseBlock block = upstreamOperator.nextBlock();
- if (block.isData()) {
- return block;
- }
- MseBlock.Eos eosBlock = (MseBlock.Eos) block;
- if (eosBlock.isSuccess()) {
- _finishedChildren++;
- } else {
- _eos = eosBlock;
- return block;
- }
- }
- return SuccessMseBlock.INSTANCE;
+ protected Logger logger() {
+ return LOGGER;
}
@Override
- protected StatMap<?> copyStatMaps() {
- return new StatMap<>(_statMap);
+ public Type getOperatorType() {
+ return Type.UNION;
}
+ @Nullable
@Override
- protected boolean handleRowMatched(Object[] row) {
- throw new UnsupportedOperationException("Union operator does not support
row matching");
+ public String toExplainString() {
+ return EXPLAIN_NAME;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index a1ef3c9571b..53d413d1eaa 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -59,6 +59,7 @@ import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.operator.SortOperator;
import org.apache.pinot.query.runtime.operator.SortedMailboxReceiveOperator;
import org.apache.pinot.query.runtime.operator.TransformOperator;
+import org.apache.pinot.query.runtime.operator.UnionAllOperator;
import org.apache.pinot.query.runtime.operator.UnionOperator;
import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
@@ -171,7 +172,9 @@ public class PlanNodeToOpChain {
}
switch (setOpNode.getSetOpType()) {
case UNION:
- return new UnionOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
+ return setOpNode.isAll() ? new UnionAllOperator(context,
inputOperators,
+ setOpNode.getInputs().get(0).getDataSchema())
+ : new UnionOperator(context, inputOperators,
setOpNode.getInputs().get(0).getDataSchema());
case INTERSECT:
return setOpNode.isAll() ? new IntersectAllOperator(context,
inputOperators,
setOpNode.getInputs().get(0).getDataSchema())
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
index 028d9c4422b..335e17e9a78 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.mockito.Mock;
@@ -115,4 +116,48 @@ public class IntersectOperatorTest {
Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
}
}
+
+ @Test
+ public void testErrorBlockRightChild() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}))
+ .thenReturn(SuccessMseBlock.INSTANCE);
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
right operator")));
+
+ IntersectOperator intersectOperator =
+ new IntersectOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ MseBlock result = intersectOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = intersectOperator.nextBlock();
+ }
+ Assert.assertTrue(result.isError());
+ }
+
+ @Test
+ public void testErrorBlockLeftChild() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
left operator")));
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new
Object[]{4, "bb"}))
+ .thenReturn(SuccessMseBlock.INSTANCE);
+
+ IntersectOperator intersectOperator =
+ new IntersectOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ MseBlock result = intersectOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = intersectOperator.nextBlock();
+ }
+ Assert.assertTrue(result.isError());
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
index 2b9409290da..83ff007985b 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.mockito.Mock;
@@ -117,4 +118,48 @@ public class MinusOperatorTest {
Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
}
}
+
+ @Test
+ public void testErrorBlockRightChild() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}))
+ .thenReturn(SuccessMseBlock.INSTANCE);
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
right operator")));
+
+ MinusOperator minusOperator =
+ new MinusOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ MseBlock result = minusOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = minusOperator.nextBlock();
+ }
+ Assert.assertTrue(result.isError());
+ }
+
+ @Test
+ public void testErrorBlockLeftChild() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
left operator")));
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new
Object[]{4, "bb"}))
+ .thenReturn(SuccessMseBlock.INSTANCE);
+
+ MinusOperator minusOperator =
+ new MinusOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ MseBlock result = minusOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = minusOperator.nextBlock();
+ }
+ Assert.assertTrue(result.isError());
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionAllOperatorTest.java
similarity index 54%
copy from
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
copy to
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionAllOperatorTest.java
index 2b9409290da..e5e01959014 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionAllOperatorTest.java
@@ -19,10 +19,12 @@
package org.apache.pinot.query.runtime.operator;
import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.mockito.Mock;
@@ -34,7 +36,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-public class MinusOperatorTest {
+public class UnionAllOperatorTest {
private AutoCloseable _mocks;
@Mock
@@ -59,28 +61,31 @@ public class MinusOperatorTest {
}
@Test
- public void testExceptOperator() {
+ public void testUnionOperator() {
DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
});
Mockito.when(_leftOperator.nextBlock())
- .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}, new Object[]{3, "CC"},
- new Object[]{4, "DD"}))
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}))
.thenReturn(SuccessMseBlock.INSTANCE);
Mockito.when(_rightOperator.nextBlock()).thenReturn(
- OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}, new Object[]{5, "EE"}))
+ OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new
Object[]{4, "bb"}, new Object[]{5, "cc"}))
.thenReturn(SuccessMseBlock.INSTANCE);
- MinusOperator minusOperator =
- new MinusOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ UnionAllOperator unionAllOperator =
+ new UnionAllOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
schema);
-
- MseBlock result = minusOperator.nextBlock();
- while (result.isEos()) {
- result = minusOperator.nextBlock();
+ List<Object[]> resultRows = new ArrayList<>();
+ MseBlock result = unionAllOperator.nextBlock();
+ while (result.isData()) {
+ resultRows.addAll(((MseBlock.Data) result).asRowHeap().getRows());
+ result = unionAllOperator.nextBlock();
}
- List<Object[]> resultRows = ((MseBlock.Data) result).asRowHeap().getRows();
- List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new
Object[]{4, "DD"});
+ // Note that UNION ALL does not guarantee the order of rows, and our
implementation adds rows from the right child
+ // first
+ List<Object[]> expectedRows =
+ Arrays.asList(new Object[]{3, "aa"}, new Object[]{4, "bb"}, new
Object[]{5, "cc"}, new Object[]{1, "AA"},
+ new Object[]{2, "BB"});
Assert.assertEquals(resultRows.size(), expectedRows.size());
for (int i = 0; i < resultRows.size(); i++) {
Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
@@ -88,33 +93,46 @@ public class MinusOperatorTest {
}
@Test
- public void testDedup() {
+ public void testErrorBlockRightChild() {
DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
});
Mockito.when(_leftOperator.nextBlock())
- .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}, new Object[]{3, "CC"},
- new Object[]{4, "DD"}, new Object[]{1, "AA"}, new Object[]{2,
"BB"}, new Object[]{3, "CC"},
- new Object[]{4, "DD"}))
- .thenReturn(SuccessMseBlock.INSTANCE);
- Mockito.when(_rightOperator.nextBlock()).thenReturn(
- OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}, new Object[]{5, "EE"},
- new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5,
"EE"}))
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}))
.thenReturn(SuccessMseBlock.INSTANCE);
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
right operator")));
- MinusOperator minusOperator =
- new MinusOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ UnionAllOperator unionAllOperator =
+ new UnionAllOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
schema);
-
- MseBlock result = minusOperator.nextBlock();
- while (result.isEos()) {
- result = minusOperator.nextBlock();
+ MseBlock result = unionAllOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = unionAllOperator.nextBlock();
}
- List<Object[]> resultRows = ((MseBlock.Data) result).asRowHeap().getRows();
- List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new
Object[]{4, "DD"});
- Assert.assertEquals(resultRows.size(), expectedRows.size());
- for (int i = 0; i < resultRows.size(); i++) {
- Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+ Assert.assertTrue(result.isError());
+ }
+
+ @Test
+ public void testErrorBlockLeftChild() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
left operator")));
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new
Object[]{4, "bb"}))
+ .thenReturn(SuccessMseBlock.INSTANCE);
+
+ UnionAllOperator unionAllOperator =
+ new UnionAllOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ MseBlock result = unionAllOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = unionAllOperator.nextBlock();
}
+ Assert.assertTrue(result.isError());
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
index 562528fcc25..c8b27d466c6 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.mockito.Mock;
@@ -67,8 +68,9 @@ public class UnionOperatorTest {
Mockito.when(_leftOperator.nextBlock())
.thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}))
.thenReturn(SuccessMseBlock.INSTANCE);
- Mockito.when(_rightOperator.nextBlock()).thenReturn(
- OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new
Object[]{4, "bb"}, new Object[]{5, "cc"}))
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new
Object[]{4, "bb"}, new Object[]{5, "cc"},
+ new Object[]{2, "BB"}))
.thenReturn(SuccessMseBlock.INSTANCE);
UnionOperator unionOperator =
@@ -81,11 +83,55 @@ public class UnionOperatorTest {
result = unionOperator.nextBlock();
}
List<Object[]> expectedRows =
- Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"}, new
Object[]{3, "aa"}, new Object[]{4, "bb"},
- new Object[]{5, "cc"});
+ Arrays.asList(new Object[]{3, "aa"}, new Object[]{4, "bb"}, new
Object[]{5, "cc"}, new Object[]{2, "BB"},
+ new Object[]{1, "AA"});
Assert.assertEquals(resultRows.size(), expectedRows.size());
for (int i = 0; i < resultRows.size(); i++) {
Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
}
}
+
+ @Test
+ public void testErrorBlockRightChild() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new
Object[]{2, "BB"}))
+ .thenReturn(SuccessMseBlock.INSTANCE);
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
right operator")));
+
+ UnionOperator unionOperator =
+ new UnionOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ MseBlock result = unionOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = unionOperator.nextBlock();
+ }
+ Assert.assertTrue(result.isError());
+ }
+
+ @Test
+ public void testErrorBlockLeftChild() {
+ DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"},
new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+ });
+ Mockito.when(_leftOperator.nextBlock())
+ .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in
left operator")));
+ Mockito.when(_rightOperator.nextBlock())
+ .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new
Object[]{4, "bb"}))
+ .thenReturn(SuccessMseBlock.INSTANCE);
+
+ UnionOperator unionOperator =
+ new UnionOperator(OperatorTestUtil.getTracingContext(),
ImmutableList.of(_leftOperator, _rightOperator),
+ schema);
+ MseBlock result = unionOperator.nextBlock();
+ // Keep calling nextBlock until we get an EoS block
+ while (!result.isEos()) {
+ result = unionOperator.nextBlock();
+ }
+ Assert.assertTrue(result.isError());
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 9be04a5b497..93eff5c5ac8 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -62,6 +62,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
+import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -287,8 +288,8 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
throws Exception {
// query pinot
if (ignoreV2Optimizer) {
- LOGGER.warn("Ignoring query for test-case ({}): with v2 optimizer: {}",
testCaseName, sql);
- return;
+ throw new SkipException(
+ "Ignoring query for test-case with v2 optimizer, testCase: " +
testCaseName + ", SQL: " + sql);
}
sql = String.format("SET usePhysicalOptimizer=true; %s", sql);
runQuery(sql, expect, false).ifPresent(queryResult -> {
@@ -302,7 +303,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
@Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
public void testQueryTestCasesWithOutput(String testCaseName, boolean
isIgnored, String sql, String h2Sql,
- List<Object[]> expectedRows, String expect, boolean keepOutputRowOrder)
+ List<Object[]> expectedRows, String expect, boolean keepOutputRowOrder,
boolean ignoreV2Optimizer)
throws Exception {
runQuery(sql, expect, false).ifPresent(
queryResult -> compareRowEquals(queryResult.getResultTable(),
expectedRows, keepOutputRowOrder));
@@ -310,8 +311,12 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
@Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
public void testQueryTestCasesWithNewOptimizerWithOutput(String
testCaseName, boolean isIgnored, String sql,
- String h2Sql, List<Object[]> expectedRows, String expect, boolean
keepOutputRowOrder)
+ String h2Sql, List<Object[]> expectedRows, String expect, boolean
keepOutputRowOrder, boolean ignoreV2Optimizer)
throws Exception {
+ if (ignoreV2Optimizer) {
+ throw new SkipException(
+ "Ignoring query for test-case with v2 optimizer, testCase: " +
testCaseName + ", SQL: " + sql);
+ }
final String finalSql = String.format("SET usePhysicalOptimizer=true; %s",
sql);
runQuery(finalSql, expect, false).ifPresent(
queryResult -> compareRowEquals(queryResult.getResultTable(),
expectedRows, keepOutputRowOrder));
@@ -319,8 +324,12 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
@Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
public void testQueryTestCasesWithLiteModeWithOutput(String testCaseName,
boolean isIgnored, String sql,
- String h2Sql, List<Object[]> expectedRows, String expect, boolean
keepOutputRowOrder)
+ String h2Sql, List<Object[]> expectedRows, String expect, boolean
keepOutputRowOrder, boolean ignoreV2Optimizer)
throws Exception {
+ if (ignoreV2Optimizer) {
+ throw new SkipException(
+ "Ignoring query for test-case with v2 optimizer, testCase: " +
testCaseName + ", SQL: " + sql);
+ }
final String finalSql = String.format("SET usePhysicalOptimizer=true; SET
useLiteMode=true; %s", sql);
runQuery(finalSql, expect, false).ifPresent(
queryResult -> compareRowEquals(queryResult.getResultTable(),
expectedRows, keepOutputRowOrder));
@@ -435,7 +444,7 @@ public class ResourceBasedQueriesTest extends
QueryRunnerTestBase {
}
Object[] testEntry = new Object[]{
testCaseName, queryCase._ignored, sql, h2Sql, expectedRows,
queryCase._expectedException,
- queryCase._keepOutputRowOrder
+ queryCase._keepOutputRowOrder, queryCase._ignoreV2Optimizer
};
providerContent.add(testEntry);
}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOps.json
b/pinot-query-runtime/src/test/resources/queries/SetOps.json
deleted file mode 100644
index 4ae8a800e6a..00000000000
--- a/pinot-query-runtime/src/test/resources/queries/SetOps.json
+++ /dev/null
@@ -1,51 +0,0 @@
-{
- "set_op_test": {
- "tables": {
- "tbl1": {
- "schema":[
- {"name": "intCol", "type": "INT"},
- {"name": "longCol", "type": "LONG"},
- {"name": "floatCol", "type": "FLOAT"},
- {"name": "doubleCol", "type": "DOUBLE"},
- {"name": "strCol", "type": "STRING"}
- ],
- "inputs": [
- [1, 8, 3.0, 5.176518e16, "lyons"],
- [2, 9, 4.0, 4.608155e11, "onan"],
- [3, 14, 5.0, 1.249261e11, "rudvalis"],
- [4, 21, 6.0, 8.677557e19, "janko"],
- [1, 41, 2.0, 4.15478e33, "baby"],
- [2, 46, 1.0, 8.08017e53, "monster"]
- ]
- },
- "tbl2": {
- "schema":[
- {"name": "intCol", "type": "INT"},
- {"name": "strCol", "type": "STRING"}
- ],
- "inputs": [
- [1, "foo"],
- [2, "bar"]
- ]
- },
- "tbl3": {
- "schema":[
- {"name": "intArrayCol", "type": "INT", "isSingleValue": false},
- {"name": "strArrayCol", "type": "STRING", "isSingleValue": false}
- ],
- "inputs": [
- [[1, 10], ["foo1", "foo2"]]
- ]
- }
- },
- "queries": [
- { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM
{tbl1}"},
- { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 MINUS SELECT
intCol FROM {tbl1} WHERE floatCol <2.5 "},
- { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 EXCEPT SELECT
intCol FROM {tbl1} WHERE floatCol <2.5 "},
- { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} UNION
ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1}"},
- { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE
strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM
{tbl1} WHERE strCol = 'baby' "},
- { "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
- { "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT
intArrayCol, strArrayCol FROM {tbl3}"}
- ]
- }
-}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
new file mode 100644
index 00000000000..8b09a61a94b
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
@@ -0,0 +1,85 @@
+{
+ "set_op_test_h2": {
+ "tables": {
+ "tbl1": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "longCol", "type": "LONG"},
+ {"name": "floatCol", "type": "FLOAT"},
+ {"name": "doubleCol", "type": "DOUBLE"},
+ {"name": "strCol", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, 8, 3.0, 5.176518e16, "lyons"],
+ [2, 9, 4.0, 4.608155e11, "onan"],
+ [3, 14, 5.0, 1.249261e11, "rudvalis"],
+ [4, 21, 6.0, 8.677557e19, "janko"],
+ [1, 41, 2.0, 4.15478e33, "baby"],
+ [2, 46, 1.0, 8.08017e53, "monster"]
+ ]
+ },
+ "tbl2": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "strCol", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, "foo"],
+ [2, "bar"],
+ [1, "bar"]
+ ]
+ },
+ "tbl3": {
+ "schema":[
+ {"name": "intArrayCol", "type": "INT", "isSingleValue": false},
+ {"name": "strArrayCol", "type": "STRING", "isSingleValue": false}
+ ],
+ "inputs": [
+ [[1, 10], ["foo1", "foo2"]]
+ ]
+ }
+ },
+ "queries": [
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION SELECT intCol FROM {tbl2}"},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM
{tbl2}"},
+ { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}"},
+ { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM
{tbl2}"},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION (SELECT intCol FROM {tbl1} WHERE intCol > 2)", "ignoreV2Optimizer":
true},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION ALL SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} INTERSECT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION SELECT intCol FROM {tbl2} UNION ALL SELECT intCol FROM {tbl1}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION SELECT intCol FROM {tbl2} INTERSECT SELECT intCol FROM {tbl1}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION SELECT intCol FROM {tbl2} EXCEPT SELECT intCol FROM {tbl1}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION ALL SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION ALL SELECT intCol FROM {tbl2} UNION ALL SELECT intCol FROM
{tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} EXCEPT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}
UNION ALL SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} INTERSECT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
UNION ALL SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}"},
+ { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 MINUS SELECT
intCol FROM {tbl1} WHERE floatCol <2.5 "},
+ { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 EXCEPT SELECT
intCol FROM {tbl1} WHERE floatCol <2.5 "},
+ { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} UNION
ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1}"},
+ { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE
strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM
{tbl1} WHERE strCol = 'baby' "},
+ { "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
+ { "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT
intArrayCol, strArrayCol FROM {tbl3}"},
+ { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+ { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} UNION SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"}
+ ]
+ }
+}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
new file mode 100644
index 00000000000..70f79836e8e
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
@@ -0,0 +1,175 @@
+{
+ "set_op_test_non_h2": {
+ "tables": {
+ "tbl1": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "longCol", "type": "LONG"},
+ {"name": "floatCol", "type": "FLOAT"},
+ {"name": "doubleCol", "type": "DOUBLE"},
+ {"name": "strCol", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, 8, 3.0, 5.176518e16, "lyons"],
+ [2, 9, 4.0, 4.608155e11, "onan"],
+ [3, 14, 5.0, 1.249261e11, "rudvalis"],
+ [4, 21, 6.0, 8.677557e19, "janko"],
+ [1, 41, 2.0, 4.15478e33, "baby"],
+ [2, 46, 1.0, 8.08017e53, "monster"]
+ ]
+ },
+ "tbl2": {
+ "schema":[
+ {"name": "intCol", "type": "INT"},
+ {"name": "strCol", "type": "STRING"}
+ ],
+ "inputs": [
+ [1, "foo"],
+ [2, "bar"],
+ [1, "bar"]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "INTERSECT ALL is not supported by H2, so we use
hardcoded output to validate the query",
+ "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM
{tbl2}",
+ "outputs": [
+ [1],
+ [2],
+ [1]
+ ]
+ },
+ {
+ "description": "EXCEPT ALL is not supported by H2, so we use hardcoded
output to validate the query",
+ "sql": "SELECT intCol FROM {tbl1} EXCEPT ALL SELECT intCol FROM
{tbl2}",
+ "outputs": [
+ [3],
+ [4],
+ [2]
+ ]
+ },
+ {
+ "description": "MINUS ALL is not supported by H2, so we use hardcoded
output to validate the query",
+ "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2}",
+ "outputs": [
+ [3],
+ [4],
+ [2]
+ ]
+ },
+ {
+ "description": "INTERSECT ALL with UNION",
+ "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM
{tbl2} UNION SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [1],
+ [2],
+ [3],
+ [4]
+ ],
+ "ignoreV2Optimizer": true
+ },
+ {
+ "description": "INTERSECT ALL with UNION",
+ "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} INTERSECT ALL SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [1],
+ [2],
+ [3],
+ [4]
+ ],
+ "ignoreV2Optimizer": true
+ },
+ {
+ "description": "INTERSECT ALL with UNION ALL",
+ "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM
{tbl2} UNION ALL SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [1],
+ [2],
+ [1],
+ [1],
+ [2],
+ [3],
+ [4],
+ [1],
+ [2]
+ ]
+ },
+ {
+ "description": "INTERSECT ALL with INTERSECT",
+ "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM
{tbl2} INTERSECT SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [1],
+ [2]
+ ]
+ },
+ {
+ "description": "INTERSECT ALL with EXCEPT",
+ "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM
{tbl2} EXCEPT SELECT intCol FROM {tbl1}",
+ "outputs": [
+ ]
+ },
+ {
+ "description": "EXCEPT ALL with UNION",
+ "sql": "SELECT intCol FROM {tbl1} EXCEPT ALL SELECT intCol FROM {tbl2}
UNION SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [3],
+ [4],
+ [1],
+ [2]
+ ],
+ "ignoreV2Optimizer": true
+ },
+ {
+ "description": "EXCEPT ALL with UNION",
+ "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM
{tbl1} EXCEPT ALL SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [3],
+ [4],
+ [1],
+ [2]
+ ]
+ },
+ {
+ "description": "EXCEPT ALL with UNION ALL",
+ "sql": "SELECT intCol FROM {tbl1} EXCEPT ALL SELECT intCol FROM {tbl2}
UNION ALL SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [3],
+ [4],
+ [2],
+ [1],
+ [2],
+ [3],
+ [4],
+ [1],
+ [2]
+ ]
+ },
+ {
+ "description": "MINUS ALL with INTERSECT",
+ "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2}
INTERSECT SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [3],
+ [4],
+ [1],
+ [2]
+ ]
+ },
+ {
+ "description": "MINUS ALL with EXCEPT",
+ "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2}
EXCEPT SELECT intCol FROM {tbl1}",
+ "outputs": [
+ ]
+ },
+ {
+ "description": "MINUS ALL with INTERSECT ALL",
+ "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2}
INTERSECT ALL SELECT intCol FROM {tbl1}",
+ "outputs": [
+ [3],
+ [4],
+ [2]
+ ]
+ }
+ ]
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]