This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 625330e6fd7 Add distinct UNION set operator (#16570)
625330e6fd7 is described below

commit 625330e6fd7fee654f7da3a90fe42e4c9eed421a
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Aug 14 09:36:22 2025 +0530

    Add distinct UNION set operator (#16570)
---
 .../pinot/query/runtime/operator/SetOperator.java  |  82 ++++++----
 .../{UnionOperator.java => UnionAllOperator.java}  |  43 ++---
 .../query/runtime/operator/UnionOperator.java      |  88 ++++++-----
 .../query/runtime/plan/PlanNodeToOpChain.java      |   5 +-
 .../runtime/operator/IntersectOperatorTest.java    |  45 ++++++
 .../query/runtime/operator/MinusOperatorTest.java  |  45 ++++++
 ...OperatorTest.java => UnionAllOperatorTest.java} |  82 ++++++----
 .../query/runtime/operator/UnionOperatorTest.java  |  54 ++++++-
 .../runtime/queries/ResourceBasedQueriesTest.java  |  21 ++-
 .../src/test/resources/queries/SetOps.json         |  51 ------
 .../src/test/resources/queries/SetOpsH2.json       |  85 ++++++++++
 .../src/test/resources/queries/SetOpsNonH2.json    | 175 +++++++++++++++++++++
 12 files changed, 577 insertions(+), 199 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
index 8b051eae867..ae5e0a5471f 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SetOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 import java.util.ArrayList;
@@ -43,24 +44,23 @@ import org.apache.pinot.segment.spi.IndexSegment;
 public abstract class SetOperator extends MultiStageOperator {
   protected final Multiset<Record> _rightRowSet;
 
-  private final List<MultiStageOperator> _inputOperators;
-  private final MultiStageOperator _leftChildOperator;
-  private final MultiStageOperator _rightChildOperator;
-  private final DataSchema _dataSchema;
+  protected final MultiStageOperator _leftChildOperator;
+  protected final MultiStageOperator _rightChildOperator;
+  protected final DataSchema _dataSchema;
 
-  private boolean _isRightSetBuilt;
-  protected MseBlock.Eos _eos;
-  protected final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
+  private boolean _isRightChildOperatorProcessed;
+  private MseBlock.Eos _eos;
+  private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
 
   public SetOperator(OpChainExecutionContext opChainExecutionContext, 
List<MultiStageOperator> inputOperators,
       DataSchema dataSchema) {
     super(opChainExecutionContext);
     _dataSchema = dataSchema;
-    _inputOperators = inputOperators;
-    _leftChildOperator = getChildOperators().get(0);
-    _rightChildOperator = getChildOperators().get(1);
+    Preconditions.checkState(inputOperators.size() == 2, "Set operator should 
have 2 child operators");
+    _leftChildOperator = inputOperators.get(0);
+    _rightChildOperator = inputOperators.get(1);
     _rightRowSet = HashMultiset.create();
-    _isRightSetBuilt = false;
+    _isRightChildOperatorProcessed = false;
   }
 
   @Override
@@ -71,7 +71,7 @@ public abstract class SetOperator extends MultiStageOperator {
 
   @Override
   public List<MultiStageOperator> getChildOperators() {
-    return _inputOperators;
+    return List.of(_leftChildOperator, _rightChildOperator);
   }
 
   @Override
@@ -96,17 +96,40 @@ public abstract class SetOperator extends 
MultiStageOperator {
 
   @Override
   protected MseBlock getNextBlock() {
-    if (!_isRightSetBuilt) {
-      // construct a SET with all the right side rows.
-      constructRightBlockSet();
-    }
     if (_eos != null) {
       return _eos;
     }
-    return constructResultBlockSet();
+
+    if (!_isRightChildOperatorProcessed) {
+      MseBlock mseBlock = processRightOperator();
+
+      if (mseBlock.isData()) {
+        return mseBlock;
+      } else if (mseBlock.isError()) {
+        _eos = (MseBlock.Eos) mseBlock;
+        return _eos;
+      } else if (mseBlock.isSuccess()) {
+        // If it's a regular EOS block, we continue to process the left child 
operator.
+        _isRightChildOperatorProcessed = true;
+      }
+    }
+
+    MseBlock mseBlock = processLeftOperator();
+    if (mseBlock.isEos()) {
+      _eos = (MseBlock.Eos) mseBlock;
+      return _eos;
+    } else {
+      return mseBlock;
+    }
   }
 
-  protected void constructRightBlockSet() {
+  /**
+   * Processes the right child operator and build the set of rows that can be 
used to filter the left child. This method
+   * can be overridden to also be able to return data blocks while processing 
the right operator.
+   *
+   * @return either a data block containing rows or an EoS block, never {@code 
null}.
+   */
+  protected MseBlock processRightOperator() {
     MseBlock block = _rightChildOperator.nextBlock();
     while (block.isData()) {
       MseBlock.Data dataBlock = (MseBlock.Data) block;
@@ -116,23 +139,22 @@ public abstract class SetOperator extends 
MultiStageOperator {
       sampleAndCheckInterruption();
       block = _rightChildOperator.nextBlock();
     }
-    MseBlock.Eos eosBlock = (MseBlock.Eos) block;
-    if (eosBlock.isError()) {
-      _eos = eosBlock;
-    } else {
-      _isRightSetBuilt = true;
-    }
+    assert block.isEos();
+    return block;
   }
 
-  protected MseBlock constructResultBlockSet() {
+  /**
+   * Processes the left child operator and returns blocks of rows that match 
the criteria defined by the set operation.
+   *
+   * @return block containing matched rows or EoS, never {@code null}.
+   */
+  protected MseBlock processLeftOperator() {
     // Keep reading the input blocks until we find a match row or all blocks 
are processed.
     // TODO: Consider batching the rows to improve performance.
     while (true) {
       MseBlock leftBlock = _leftChildOperator.nextBlock();
       if (leftBlock.isEos()) {
-        MseBlock.Eos eosBlock = (MseBlock.Eos) leftBlock;
-        _eos = eosBlock;
-        return eosBlock;
+        return leftBlock;
       }
       MseBlock.Data dataBlock = (MseBlock.Data) leftBlock;
       List<Object[]> rows = new ArrayList<>();
@@ -155,8 +177,10 @@ public abstract class SetOperator extends 
MultiStageOperator {
 
   /**
    * Returns true if the row is matched.
+   * <p>
    * Also updates the right row set based on the Operator.
-   * @param row
+   *
+   * @param row the row to be checked for matching.
    * @return true if the row is matched.
    */
   protected abstract boolean handleRowMatched(Object[] row);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionAllOperator.java
similarity index 55%
copy from 
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
copy to 
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionAllOperator.java
index d7965aa39ac..a3237968939 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionAllOperator.java
@@ -19,12 +19,8 @@
 package org.apache.pinot.query.runtime.operator;
 
 import java.util.List;
-import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
-import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
-import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,14 +29,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Union operator for UNION ALL queries.
  */
-public class UnionOperator extends SetOperator {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(UnionOperator.class);
-  private static final String EXPLAIN_NAME = "UNION";
-  @Nullable
-  private MultiStageQueryStats _queryStats = null;
-  private int _finishedChildren = 0;
+public class UnionAllOperator extends SetOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(UnionAllOperator.class);
+  private static final String EXPLAIN_NAME = "UNION_ALL";
 
-  public UnionOperator(OpChainExecutionContext opChainExecutionContext, 
List<MultiStageOperator> inputOperators,
+  public UnionAllOperator(OpChainExecutionContext opChainExecutionContext, 
List<MultiStageOperator> inputOperators,
       DataSchema dataSchema) {
     super(opChainExecutionContext, inputOperators, dataSchema);
   }
@@ -61,35 +54,17 @@ public class UnionOperator extends SetOperator {
   }
 
   @Override
-  protected MseBlock getNextBlock() {
-    if (_eos != null) {
-      return _eos;
-    }
-    List<MultiStageOperator> childOperators = getChildOperators();
-    for (int i = _finishedChildren; i < childOperators.size(); i++) {
-      MultiStageOperator upstreamOperator = childOperators.get(i);
-      MseBlock block = upstreamOperator.nextBlock();
-      if (block.isData()) {
-        return block;
-      }
-      MseBlock.Eos eosBlock = (MseBlock.Eos) block;
-      if (eosBlock.isSuccess()) {
-        _finishedChildren++;
-      } else {
-        _eos = eosBlock;
-        return block;
-      }
-    }
-    return SuccessMseBlock.INSTANCE;
+  protected MseBlock processRightOperator() {
+    return _rightChildOperator.nextBlock();
   }
 
   @Override
-  protected StatMap<?> copyStatMaps() {
-    return new StatMap<>(_statMap);
+  protected MseBlock processLeftOperator() {
+    return _leftChildOperator.nextBlock();
   }
 
   @Override
   protected boolean handleRowMatched(Object[] row) {
-    throw new UnsupportedOperationException("Union operator does not support 
row matching");
+    throw new UnsupportedOperationException("UNION ALL operator does not 
support row matching");
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
index d7965aa39ac..db43853d56c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/UnionOperator.java
@@ -18,78 +18,82 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import java.util.ArrayList;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
-import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
-import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Union operator for UNION ALL queries.
+ * Union operator for UNION queries. Unlike {@link UnionAllOperator}, this 
operator removes duplicate rows and only
+ * returns distinct rows.
  */
 public class UnionOperator extends SetOperator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(UnionOperator.class);
   private static final String EXPLAIN_NAME = "UNION";
-  @Nullable
-  private MultiStageQueryStats _queryStats = null;
-  private int _finishedChildren = 0;
 
-  public UnionOperator(OpChainExecutionContext opChainExecutionContext, 
List<MultiStageOperator> inputOperators,
-      DataSchema dataSchema) {
+  public UnionOperator(OpChainExecutionContext opChainExecutionContext,
+      List<MultiStageOperator> inputOperators, DataSchema dataSchema) {
     super(opChainExecutionContext, inputOperators, dataSchema);
   }
 
   @Override
-  protected Logger logger() {
-    return LOGGER;
-  }
-
-  @Override
-  public Type getOperatorType() {
-    return Type.UNION;
+  protected MseBlock processRightOperator() {
+    MseBlock block = _rightChildOperator.nextBlock();
+    while (block.isData()) {
+      MseBlock.Data dataBlock = (MseBlock.Data) block;
+      List<Object[]> rows = new ArrayList<>();
+      for (Object[] row : dataBlock.asRowHeap().getRows()) {
+        Record record = new Record(row);
+        if (!_rightRowSet.contains(record)) {
+          // Add a new unique row.
+          rows.add(row);
+          _rightRowSet.add(record);
+        }
+      }
+      sampleAndCheckInterruption();
+      // If we have collected some rows, return them as a new block.
+      if (!rows.isEmpty()) {
+        return new RowHeapDataBlock(rows, _dataSchema);
+      } else {
+        block = _rightChildOperator.nextBlock();
+      }
+    }
+    assert block.isEos();
+    return block;
   }
 
   @Override
-  public String toExplainString() {
-    return EXPLAIN_NAME;
+  protected boolean handleRowMatched(Object[] row) {
+    if (!_rightRowSet.contains(new Record(row))) {
+      // Row is unique, add it to the result and also to the row set to skip 
later duplicates.
+      _rightRowSet.add(new Record(row));
+      return true;
+    } else {
+      // Row is a duplicate, skip it.
+      return false;
+    }
   }
 
   @Override
-  protected MseBlock getNextBlock() {
-    if (_eos != null) {
-      return _eos;
-    }
-    List<MultiStageOperator> childOperators = getChildOperators();
-    for (int i = _finishedChildren; i < childOperators.size(); i++) {
-      MultiStageOperator upstreamOperator = childOperators.get(i);
-      MseBlock block = upstreamOperator.nextBlock();
-      if (block.isData()) {
-        return block;
-      }
-      MseBlock.Eos eosBlock = (MseBlock.Eos) block;
-      if (eosBlock.isSuccess()) {
-        _finishedChildren++;
-      } else {
-        _eos = eosBlock;
-        return block;
-      }
-    }
-    return SuccessMseBlock.INSTANCE;
+  protected Logger logger() {
+    return LOGGER;
   }
 
   @Override
-  protected StatMap<?> copyStatMaps() {
-    return new StatMap<>(_statMap);
+  public Type getOperatorType() {
+    return Type.UNION;
   }
 
+  @Nullable
   @Override
-  protected boolean handleRowMatched(Object[] row) {
-    throw new UnsupportedOperationException("Union operator does not support 
row matching");
+  public String toExplainString() {
+    return EXPLAIN_NAME;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
index a1ef3c9571b..53d413d1eaa 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanNodeToOpChain.java
@@ -59,6 +59,7 @@ import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.SortOperator;
 import org.apache.pinot.query.runtime.operator.SortedMailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
+import org.apache.pinot.query.runtime.operator.UnionAllOperator;
 import org.apache.pinot.query.runtime.operator.UnionOperator;
 import org.apache.pinot.query.runtime.operator.WindowAggregateOperator;
 import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
@@ -171,7 +172,9 @@ public class PlanNodeToOpChain {
         }
         switch (setOpNode.getSetOpType()) {
           case UNION:
-            return new UnionOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
+            return setOpNode.isAll() ? new UnionAllOperator(context, 
inputOperators,
+                setOpNode.getInputs().get(0).getDataSchema())
+                : new UnionOperator(context, inputOperators, 
setOpNode.getInputs().get(0).getDataSchema());
           case INTERSECT:
             return setOpNode.isAll() ? new IntersectAllOperator(context, 
inputOperators,
                 setOpNode.getInputs().get(0).getDataSchema())
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
index 028d9c4422b..335e17e9a78 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/IntersectOperatorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.mockito.Mock;
@@ -115,4 +116,48 @@ public class IntersectOperatorTest {
       Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
     }
   }
+
+  @Test
+  public void testErrorBlockRightChild() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}))
+        .thenReturn(SuccessMseBlock.INSTANCE);
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
right operator")));
+
+    IntersectOperator intersectOperator =
+        new IntersectOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    MseBlock result = intersectOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = intersectOperator.nextBlock();
+    }
+    Assert.assertTrue(result.isError());
+  }
+
+  @Test
+  public void testErrorBlockLeftChild() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
left operator")));
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new 
Object[]{4, "bb"}))
+        .thenReturn(SuccessMseBlock.INSTANCE);
+
+    IntersectOperator intersectOperator =
+        new IntersectOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    MseBlock result = intersectOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = intersectOperator.nextBlock();
+    }
+    Assert.assertTrue(result.isError());
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
index 2b9409290da..83ff007985b 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.mockito.Mock;
@@ -117,4 +118,48 @@ public class MinusOperatorTest {
       Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
     }
   }
+
+  @Test
+  public void testErrorBlockRightChild() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}))
+        .thenReturn(SuccessMseBlock.INSTANCE);
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
right operator")));
+
+    MinusOperator minusOperator =
+        new MinusOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    MseBlock result = minusOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = minusOperator.nextBlock();
+    }
+    Assert.assertTrue(result.isError());
+  }
+
+  @Test
+  public void testErrorBlockLeftChild() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
left operator")));
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new 
Object[]{4, "bb"}))
+        .thenReturn(SuccessMseBlock.INSTANCE);
+
+    MinusOperator minusOperator =
+        new MinusOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    MseBlock result = minusOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = minusOperator.nextBlock();
+    }
+    Assert.assertTrue(result.isError());
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionAllOperatorTest.java
similarity index 54%
copy from 
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
copy to 
pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionAllOperatorTest.java
index 2b9409290da..e5e01959014 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MinusOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionAllOperatorTest.java
@@ -19,10 +19,12 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.mockito.Mock;
@@ -34,7 +36,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
-public class MinusOperatorTest {
+public class UnionAllOperatorTest {
   private AutoCloseable _mocks;
 
   @Mock
@@ -59,28 +61,31 @@ public class MinusOperatorTest {
   }
 
   @Test
-  public void testExceptOperator() {
+  public void testUnionOperator() {
     DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
     Mockito.when(_leftOperator.nextBlock())
-        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}, new Object[]{3, "CC"},
-            new Object[]{4, "DD"}))
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}))
         .thenReturn(SuccessMseBlock.INSTANCE);
     Mockito.when(_rightOperator.nextBlock()).thenReturn(
-            OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}, new Object[]{5, "EE"}))
+            OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new 
Object[]{4, "bb"}, new Object[]{5, "cc"}))
         .thenReturn(SuccessMseBlock.INSTANCE);
 
-    MinusOperator minusOperator =
-        new MinusOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+    UnionAllOperator unionAllOperator =
+        new UnionAllOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
             schema);
-
-    MseBlock result = minusOperator.nextBlock();
-    while (result.isEos()) {
-      result = minusOperator.nextBlock();
+    List<Object[]> resultRows = new ArrayList<>();
+    MseBlock result = unionAllOperator.nextBlock();
+    while (result.isData()) {
+      resultRows.addAll(((MseBlock.Data) result).asRowHeap().getRows());
+      result = unionAllOperator.nextBlock();
     }
-    List<Object[]> resultRows = ((MseBlock.Data) result).asRowHeap().getRows();
-    List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new 
Object[]{4, "DD"});
+    // Note that UNION ALL does not guarantee the order of rows, and our 
implementation adds rows from the right child
+    // first
+    List<Object[]> expectedRows =
+        Arrays.asList(new Object[]{3, "aa"}, new Object[]{4, "bb"}, new 
Object[]{5, "cc"}, new Object[]{1, "AA"},
+            new Object[]{2, "BB"});
     Assert.assertEquals(resultRows.size(), expectedRows.size());
     for (int i = 0; i < resultRows.size(); i++) {
       Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
@@ -88,33 +93,46 @@ public class MinusOperatorTest {
   }
 
   @Test
-  public void testDedup() {
+  public void testErrorBlockRightChild() {
     DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
         DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
     });
     Mockito.when(_leftOperator.nextBlock())
-        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}, new Object[]{3, "CC"},
-            new Object[]{4, "DD"}, new Object[]{1, "AA"}, new Object[]{2, 
"BB"}, new Object[]{3, "CC"},
-            new Object[]{4, "DD"}))
-        .thenReturn(SuccessMseBlock.INSTANCE);
-    Mockito.when(_rightOperator.nextBlock()).thenReturn(
-            OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}, new Object[]{5, "EE"},
-                new Object[]{1, "AA"}, new Object[]{2, "BB"}, new Object[]{5, 
"EE"}))
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}))
         .thenReturn(SuccessMseBlock.INSTANCE);
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
right operator")));
 
-    MinusOperator minusOperator =
-        new MinusOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+    UnionAllOperator unionAllOperator =
+        new UnionAllOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
             schema);
-
-    MseBlock result = minusOperator.nextBlock();
-    while (result.isEos()) {
-      result = minusOperator.nextBlock();
+    MseBlock result = unionAllOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = unionAllOperator.nextBlock();
     }
-    List<Object[]> resultRows = ((MseBlock.Data) result).asRowHeap().getRows();
-    List<Object[]> expectedRows = Arrays.asList(new Object[]{3, "CC"}, new 
Object[]{4, "DD"});
-    Assert.assertEquals(resultRows.size(), expectedRows.size());
-    for (int i = 0; i < resultRows.size(); i++) {
-      Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
+    Assert.assertTrue(result.isError());
+  }
+
+  @Test
+  public void testErrorBlockLeftChild() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
left operator")));
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new 
Object[]{4, "bb"}))
+        .thenReturn(SuccessMseBlock.INSTANCE);
+
+    UnionAllOperator unionAllOperator =
+        new UnionAllOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    MseBlock result = unionAllOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = unionAllOperator.nextBlock();
     }
+    Assert.assertTrue(result.isError());
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
index 562528fcc25..c8b27d466c6 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/UnionOperatorTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.routing.VirtualServerAddress;
+import org.apache.pinot.query.runtime.blocks.ErrorMseBlock;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.mockito.Mock;
@@ -67,8 +68,9 @@ public class UnionOperatorTest {
     Mockito.when(_leftOperator.nextBlock())
         .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}))
         .thenReturn(SuccessMseBlock.INSTANCE);
-    Mockito.when(_rightOperator.nextBlock()).thenReturn(
-            OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new 
Object[]{4, "bb"}, new Object[]{5, "cc"}))
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new 
Object[]{4, "bb"}, new Object[]{5, "cc"},
+            new Object[]{2, "BB"}))
         .thenReturn(SuccessMseBlock.INSTANCE);
 
     UnionOperator unionOperator =
@@ -81,11 +83,55 @@ public class UnionOperatorTest {
       result = unionOperator.nextBlock();
     }
     List<Object[]> expectedRows =
-        Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "BB"}, new 
Object[]{3, "aa"}, new Object[]{4, "bb"},
-            new Object[]{5, "cc"});
+        Arrays.asList(new Object[]{3, "aa"}, new Object[]{4, "bb"}, new 
Object[]{5, "cc"}, new Object[]{2, "BB"},
+            new Object[]{1, "AA"});
     Assert.assertEquals(resultRows.size(), expectedRows.size());
     for (int i = 0; i < resultRows.size(); i++) {
       Assert.assertEquals(resultRows.get(i), expectedRows.get(i));
     }
   }
+
+  @Test
+  public void testErrorBlockRightChild() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{1, "AA"}, new 
Object[]{2, "BB"}))
+        .thenReturn(SuccessMseBlock.INSTANCE);
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
right operator")));
+
+    UnionOperator unionOperator =
+        new UnionOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    MseBlock result = unionOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = unionOperator.nextBlock();
+    }
+    Assert.assertTrue(result.isError());
+  }
+
+  @Test
+  public void testErrorBlockLeftChild() {
+    DataSchema schema = new DataSchema(new String[]{"int_col", "string_col"}, 
new DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+    });
+    Mockito.when(_leftOperator.nextBlock())
+        .thenReturn(ErrorMseBlock.fromException(new RuntimeException("Error in 
left operator")));
+    Mockito.when(_rightOperator.nextBlock())
+        .thenReturn(OperatorTestUtil.block(schema, new Object[]{3, "aa"}, new 
Object[]{4, "bb"}))
+        .thenReturn(SuccessMseBlock.INSTANCE);
+
+    UnionOperator unionOperator =
+        new UnionOperator(OperatorTestUtil.getTracingContext(), 
ImmutableList.of(_leftOperator, _rightOperator),
+            schema);
+    MseBlock result = unionOperator.nextBlock();
+    // Keep calling nextBlock until we get an EoS block
+    while (!result.isEos()) {
+      result = unionOperator.nextBlock();
+    }
+    Assert.assertTrue(result.isError());
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 9be04a5b497..93eff5c5ac8 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -62,6 +62,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
+import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -287,8 +288,8 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
       throws Exception {
     // query pinot
     if (ignoreV2Optimizer) {
-      LOGGER.warn("Ignoring query for test-case ({}): with v2 optimizer: {}", 
testCaseName, sql);
-      return;
+      throw new SkipException(
+          "Ignoring query for test-case with v2 optimizer, testCase: " + 
testCaseName + ", SQL: " + sql);
     }
     sql = String.format("SET usePhysicalOptimizer=true; %s", sql);
     runQuery(sql, expect, false).ifPresent(queryResult -> {
@@ -302,7 +303,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
 
   @Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
   public void testQueryTestCasesWithOutput(String testCaseName, boolean 
isIgnored, String sql, String h2Sql,
-      List<Object[]> expectedRows, String expect, boolean keepOutputRowOrder)
+      List<Object[]> expectedRows, String expect, boolean keepOutputRowOrder, 
boolean ignoreV2Optimizer)
       throws Exception {
     runQuery(sql, expect, false).ifPresent(
         queryResult -> compareRowEquals(queryResult.getResultTable(), 
expectedRows, keepOutputRowOrder));
@@ -310,8 +311,12 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
 
   @Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
   public void testQueryTestCasesWithNewOptimizerWithOutput(String 
testCaseName, boolean isIgnored, String sql,
-      String h2Sql, List<Object[]> expectedRows, String expect, boolean 
keepOutputRowOrder)
+      String h2Sql, List<Object[]> expectedRows, String expect, boolean 
keepOutputRowOrder, boolean ignoreV2Optimizer)
       throws Exception {
+    if (ignoreV2Optimizer) {
+      throw new SkipException(
+          "Ignoring query for test-case with v2 optimizer, testCase: " + 
testCaseName + ", SQL: " + sql);
+    }
     final String finalSql = String.format("SET usePhysicalOptimizer=true; %s", 
sql);
     runQuery(finalSql, expect, false).ifPresent(
         queryResult -> compareRowEquals(queryResult.getResultTable(), 
expectedRows, keepOutputRowOrder));
@@ -319,8 +324,12 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
 
   @Test(dataProvider = "testResourceQueryTestCaseProviderBoth")
   public void testQueryTestCasesWithLiteModeWithOutput(String testCaseName, 
boolean isIgnored, String sql,
-      String h2Sql, List<Object[]> expectedRows, String expect, boolean 
keepOutputRowOrder)
+      String h2Sql, List<Object[]> expectedRows, String expect, boolean 
keepOutputRowOrder, boolean ignoreV2Optimizer)
       throws Exception {
+    if (ignoreV2Optimizer) {
+      throw new SkipException(
+          "Ignoring query for test-case with v2 optimizer, testCase: " + 
testCaseName + ", SQL: " + sql);
+    }
     final String finalSql = String.format("SET usePhysicalOptimizer=true; SET 
useLiteMode=true; %s", sql);
     runQuery(finalSql, expect, false).ifPresent(
         queryResult -> compareRowEquals(queryResult.getResultTable(), 
expectedRows, keepOutputRowOrder));
@@ -435,7 +444,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
           }
           Object[] testEntry = new Object[]{
               testCaseName, queryCase._ignored, sql, h2Sql, expectedRows, 
queryCase._expectedException,
-              queryCase._keepOutputRowOrder
+              queryCase._keepOutputRowOrder, queryCase._ignoreV2Optimizer
           };
           providerContent.add(testEntry);
         }
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOps.json 
b/pinot-query-runtime/src/test/resources/queries/SetOps.json
deleted file mode 100644
index 4ae8a800e6a..00000000000
--- a/pinot-query-runtime/src/test/resources/queries/SetOps.json
+++ /dev/null
@@ -1,51 +0,0 @@
-{
-  "set_op_test": {
-    "tables": {
-      "tbl1": {
-        "schema":[
-          {"name": "intCol", "type": "INT"},
-          {"name": "longCol", "type": "LONG"},
-          {"name": "floatCol", "type": "FLOAT"},
-          {"name": "doubleCol", "type": "DOUBLE"},
-          {"name": "strCol", "type": "STRING"}
-        ],
-        "inputs": [
-          [1, 8, 3.0, 5.176518e16, "lyons"],
-          [2, 9, 4.0, 4.608155e11, "onan"],
-          [3, 14, 5.0, 1.249261e11, "rudvalis"],
-          [4, 21, 6.0, 8.677557e19, "janko"],
-          [1, 41, 2.0, 4.15478e33, "baby"],
-          [2, 46, 1.0, 8.08017e53, "monster"]
-        ]
-      },
-      "tbl2": {
-        "schema":[
-          {"name": "intCol", "type": "INT"},
-          {"name": "strCol", "type": "STRING"}
-        ],
-        "inputs": [
-          [1, "foo"],
-          [2, "bar"]
-        ]
-      },
-      "tbl3": {
-        "schema":[
-          {"name": "intArrayCol", "type": "INT", "isSingleValue": false},
-          {"name": "strArrayCol", "type": "STRING", "isSingleValue": false}
-        ],
-        "inputs": [
-          [[1, 10], ["foo1", "foo2"]]
-        ]
-      }
-    },
-    "queries": [
-      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM 
{tbl1}"},
-      { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 MINUS SELECT 
intCol FROM {tbl1} WHERE floatCol <2.5 "},
-      { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 EXCEPT SELECT 
intCol FROM {tbl1} WHERE floatCol <2.5 "},
-      { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} UNION 
ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1}"},
-      { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE 
strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM 
{tbl1} WHERE strCol = 'baby' "},
-      { "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
-      { "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT 
intArrayCol, strArrayCol FROM {tbl3}"}
-    ]
-  }
-}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json 
b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
new file mode 100644
index 00000000000..8b09a61a94b
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsH2.json
@@ -0,0 +1,85 @@
+{
+  "set_op_test_h2": {
+    "tables": {
+      "tbl1": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "longCol", "type": "LONG"},
+          {"name": "floatCol", "type": "FLOAT"},
+          {"name": "doubleCol", "type": "DOUBLE"},
+          {"name": "strCol", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, 8, 3.0, 5.176518e16, "lyons"],
+          [2, 9, 4.0, 4.608155e11, "onan"],
+          [3, 14, 5.0, 1.249261e11, "rudvalis"],
+          [4, 21, 6.0, 8.677557e19, "janko"],
+          [1, 41, 2.0, 4.15478e33, "baby"],
+          [2, 46, 1.0, 8.08017e53, "monster"]
+        ]
+      },
+      "tbl2": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "strCol", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "foo"],
+          [2, "bar"],
+          [1, "bar"]
+        ]
+      },
+      "tbl3": {
+        "schema":[
+          {"name": "intArrayCol", "type": "INT", "isSingleValue": false},
+          {"name": "strArrayCol", "type": "STRING", "isSingleValue": false}
+        ],
+        "inputs": [
+          [[1, 10], ["foo1", "foo2"]]
+        ]
+      }
+    },
+    "queries": [
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION SELECT intCol FROM {tbl2}"},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM 
{tbl2}"},
+      { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2}"},
+      { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM 
{tbl2}"},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION (SELECT intCol FROM {tbl1} WHERE intCol > 2)", "ignoreV2Optimizer":  
true},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION ALL SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} INTERSECT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION SELECT intCol FROM {tbl2} UNION ALL SELECT intCol FROM {tbl1}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION SELECT intCol FROM {tbl2} INTERSECT SELECT intCol FROM {tbl1}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION SELECT intCol FROM {tbl2} EXCEPT SELECT intCol FROM {tbl1}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION ALL SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION ALL SELECT intCol FROM {tbl2} UNION ALL SELECT intCol FROM 
{tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION ALL SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer": true},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} EXCEPT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} EXCEPT SELECT intCol FROM {tbl2} 
UNION ALL SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} MINUS SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} INTERSECT SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
UNION ALL SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} INTERSECT SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}"},
+      { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 MINUS SELECT 
intCol FROM {tbl1} WHERE floatCol <2.5 "},
+      { "sql": "SELECT intCol FROM {tbl1} WHERE floatCol > 2.5 EXCEPT SELECT 
intCol FROM {tbl1} WHERE floatCol <2.5 "},
+      { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} UNION 
ALL SELECT intCol, longCol, doubleCol, strCol FROM {tbl1}"},
+      { "sql": "SELECT intCol, longCol, doubleCol, strCol FROM {tbl1} WHERE 
strCol = 'monster' UNION ALL SELECT intCol, longCol, doubleCol, strCol FROM 
{tbl1} WHERE strCol = 'baby' "},
+      { "sql": "SELECT * FROM {tbl2} UNION ALL SELECT * FROM {tbl2}"},
+      { "sql": "SELECT intArrayCol, strArrayCol FROM {tbl3} UNION ALL SELECT 
intArrayCol, strArrayCol FROM {tbl3}"},
+      { "sql": "SELECT intCol FROM {tbl1} UNION SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}", "ignoreV2Optimizer":  true},
+      { "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} UNION SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}"}
+    ]
+  }
+}
diff --git a/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json 
b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
new file mode 100644
index 00000000000..70f79836e8e
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/SetOpsNonH2.json
@@ -0,0 +1,175 @@
+{
+  "set_op_test_non_h2": {
+    "tables": {
+      "tbl1": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "longCol", "type": "LONG"},
+          {"name": "floatCol", "type": "FLOAT"},
+          {"name": "doubleCol", "type": "DOUBLE"},
+          {"name": "strCol", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, 8, 3.0, 5.176518e16, "lyons"],
+          [2, 9, 4.0, 4.608155e11, "onan"],
+          [3, 14, 5.0, 1.249261e11, "rudvalis"],
+          [4, 21, 6.0, 8.677557e19, "janko"],
+          [1, 41, 2.0, 4.15478e33, "baby"],
+          [2, 46, 1.0, 8.08017e53, "monster"]
+        ]
+      },
+      "tbl2": {
+        "schema":[
+          {"name": "intCol", "type": "INT"},
+          {"name": "strCol", "type": "STRING"}
+        ],
+        "inputs": [
+          [1, "foo"],
+          [2, "bar"],
+          [1, "bar"]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "INTERSECT ALL is not supported by H2, so we use 
hardcoded output to validate the query",
+        "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM 
{tbl2}",
+        "outputs": [
+          [1],
+          [2],
+          [1]
+        ]
+      },
+      {
+        "description": "EXCEPT ALL is not supported by H2, so we use hardcoded 
output to validate the query",
+        "sql": "SELECT intCol FROM {tbl1} EXCEPT ALL SELECT intCol FROM 
{tbl2}",
+        "outputs": [
+          [3],
+          [4],
+          [2]
+        ]
+      },
+      {
+        "description": "MINUS ALL is not supported by H2, so we use hardcoded 
output to validate the query",
+        "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2}",
+        "outputs": [
+          [3],
+          [4],
+          [2]
+        ]
+      },
+      {
+        "description": "INTERSECT ALL with UNION",
+        "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM 
{tbl2} UNION SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [1],
+          [2],
+          [3],
+          [4]
+        ],
+        "ignoreV2Optimizer":  true
+      },
+      {
+        "description": "INTERSECT ALL with UNION",
+        "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} INTERSECT ALL SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [1],
+          [2],
+          [3],
+          [4]
+        ],
+        "ignoreV2Optimizer":  true
+      },
+      {
+        "description": "INTERSECT ALL with UNION ALL",
+        "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM 
{tbl2} UNION ALL SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [1],
+          [2],
+          [1],
+          [1],
+          [2],
+          [3],
+          [4],
+          [1],
+          [2]
+        ]
+      },
+      {
+        "description": "INTERSECT ALL with INTERSECT",
+        "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM 
{tbl2} INTERSECT SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [1],
+          [2]
+        ]
+      },
+      {
+        "description": "INTERSECT ALL with EXCEPT",
+        "sql": "SELECT intCol FROM {tbl1} INTERSECT ALL SELECT intCol FROM 
{tbl2} EXCEPT SELECT intCol FROM {tbl1}",
+        "outputs": [
+        ]
+      },
+      {
+        "description": "EXCEPT ALL with UNION",
+        "sql": "SELECT intCol FROM {tbl1} EXCEPT ALL SELECT intCol FROM {tbl2} 
UNION SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [3],
+          [4],
+          [1],
+          [2]
+        ],
+        "ignoreV2Optimizer":  true
+      },
+      {
+        "description": "EXCEPT ALL with UNION",
+        "sql": "SET skipPlannerRules='UnionToDistinct'; SELECT intCol FROM 
{tbl1} EXCEPT ALL SELECT intCol FROM {tbl2} UNION SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [3],
+          [4],
+          [1],
+          [2]
+        ]
+      },
+      {
+        "description": "EXCEPT ALL with UNION ALL",
+        "sql": "SELECT intCol FROM {tbl1} EXCEPT ALL SELECT intCol FROM {tbl2} 
UNION ALL SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [3],
+          [4],
+          [2],
+          [1],
+          [2],
+          [3],
+          [4],
+          [1],
+          [2]
+        ]
+      },
+      {
+        "description": "MINUS ALL with INTERSECT",
+        "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2} 
INTERSECT SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [3],
+          [4],
+          [1],
+          [2]
+        ]
+      },
+      {
+        "description": "MINUS ALL with EXCEPT",
+        "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2} 
EXCEPT SELECT intCol FROM {tbl1}",
+        "outputs": [
+        ]
+      },
+      {
+        "description": "MINUS ALL with INTERSECT ALL",
+        "sql": "SELECT intCol FROM {tbl1} MINUS ALL SELECT intCol FROM {tbl2} 
INTERSECT ALL SELECT intCol FROM {tbl1}",
+        "outputs": [
+          [3],
+          [4],
+          [2]
+        ]
+      }
+    ]
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to