This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 313d937236 Switch operators to a push-style API (#13600)
313d937236 is described below

commit 313d937236efa242bc7b3ae144b50405a0499761
Author: imply-cheddar <[email protected]>
AuthorDate: Fri Dec 23 15:01:55 2022 +0900

    Switch operators to a push-style API (#13600)
    
    * Switch operators to a push-style API
    
    This API generates nice stack-traces of processing
    for Operators.
---
 .../query/operator/NaivePartitioningOperator.java  |  65 +++----
 .../org/apache/druid/query/operator/Operator.java  |  92 ++++------
 .../druid/query/operator/OperatorSequence.java     | 105 +++++-------
 .../operator/SegmentToRowsAndColumnsOperator.java  |  46 +++--
 .../druid/query/operator/SequenceOperator.java     |  57 +------
 .../query/operator/WindowProcessorOperator.java    |  37 ++--
 .../org/apache/druid/segment/ArrayListSegment.java |  26 ++-
 .../druid/segment/CloseableShapeshifter.java       |  45 +++++
 .../druid/query/operator/ExceptionalReceiver.java} |  41 +----
 .../druid/query/operator/InlineScanOperator.java   |  25 +--
 .../operator/NaivePartitioningOperatorTest.java    |  72 +++++---
 .../druid/query/operator/OperatorSequenceTest.java |  58 +++++--
 .../druid/query/operator/OperatorTestHelper.java   | 156 +++++++++++++++++
 .../SegmentToRowsAndColumnsOperatorTest.java       | 187 +++++++++++++++++++++
 .../druid/query/operator/SequenceOperatorTest.java |  26 ++-
 .../operator/WindowProcessorOperatorTest.java      |  13 +-
 .../operator/window/RowsAndColumnsHelper.java      |   7 +
 .../window/WindowFramedAggregateProcessorTest.java |   5 +-
 .../rowsandcols/AsOnlyTestRowsAndColumns.java      |  17 --
 .../org/apache/druid/segment/TestSegmentForAs.java |  82 +++++++++
 20 files changed, 765 insertions(+), 397 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
index 15b4344b4f..71015a2c59 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.query.operator;
 
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import 
org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
 import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
@@ -53,48 +52,34 @@ public class NaivePartitioningOperator implements Operator
   }
 
   @Override
-  public void open()
+  public void go(Receiver receiver)
   {
-    child.open();
-  }
-
-  @Override
-  public RowsAndColumns next()
-  {
-    if (partitionsIter != null && partitionsIter.hasNext()) {
-      return partitionsIter.next();
-    }
-
-    if (child.hasNext()) {
-      final RowsAndColumns rac = child.next();
-
-      SortedGroupPartitioner groupPartitioner = 
rac.as(SortedGroupPartitioner.class);
-      if (groupPartitioner == null) {
-        groupPartitioner = new DefaultSortedGroupPartitioner(rac);
-      }
-
-      partitionsIter = 
groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
-      return partitionsIter.next();
-    }
+    child.go(
+        new Receiver()
+        {
+          @Override
+          public boolean push(RowsAndColumns rac)
+          {
+            SortedGroupPartitioner groupPartitioner = 
rac.as(SortedGroupPartitioner.class);
+            if (groupPartitioner == null) {
+              groupPartitioner = new DefaultSortedGroupPartitioner(rac);
+            }
 
-    throw new ISE("Asked for next when already complete");
-  }
+            partitionsIter = 
groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
 
-  @Override
-  public boolean hasNext()
-  {
-    if (partitionsIter != null && partitionsIter.hasNext()) {
-      return true;
-    }
+            boolean keepItGoing = true;
+            while (keepItGoing && partitionsIter.hasNext()) {
+              keepItGoing = receiver.push(partitionsIter.next());
+            }
+            return keepItGoing;
+          }
 
-    return child.hasNext();
-  }
-
-  @Override
-  public void close(boolean cascade)
-  {
-    if (cascade) {
-      child.close(cascade);
-    }
+          @Override
+          public void completed()
+          {
+            receiver.completed();
+          }
+        }
+    );
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/Operator.java 
b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
index ad64cd1a52..41ba4e804e 100644
--- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java
+++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
@@ -22,72 +22,48 @@ package org.apache.druid.query.operator;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
 /**
- * An Operator interface that intends to align closely with the Operators that 
other databases would also tend
- * to be implemented using.
+ * An Operator interface that intends to have implementations that align 
relatively closely with the Operators that
+ * other databases would also tend to be implemented using.  While a lot of 
Operator interfaces tend to use a
+ * pull-based orientation, we use a push-based interface.  This is to give us 
good stacktraces.  Because of the
+ * organization of the go() method, the stack traces thrown out of an Operator 
will be
+ * 1. All of the go() calls from the top-level Operator down to the leaf 
Operator, this part of the stacktrace gives
+ * visibility into what all of the actions that we expect to happen from the 
operator chain are
+ * 2. All of the push() calls up until the exception happens, this part of the 
stack trace gives us a view of all
+ * of the things that have happened to the data up until the exception was 
thrown.
  * <p>
- * The lifecycle of an operator is that, after creation, it should be opened, 
and then iterated using hasNext() and
- * next().  Finally, when the Operator is no longer useful, it should be 
closed.
+ * This "hour glass" structure of the stacktrace is by design.  It is very 
important that implementations of this
+ * interface never resort to a fluent style, inheritance or other code 
structuring that removes the name of the active
+ * Operator from the stacktrace.  It should always be possible to find ways to 
avoid code duplication and still keep
+ * the Operator's name on the stacktrace.
  * <p>
- * Operator's methods mimic the methods of an {@code Iterator}, but it does 
not implement {@code Iterator}
- * intentionally.  An operator should never be wrapped in an {@code Iterator}. 
 Any code that does that should be
- * considered a bug and fixed.  This is for two reasons:
- * <p>
- * 1. An Operator should never be passed around as an {@code Iterator}.  An 
Operator must be closed, if an operator
- * gets returned as an {@code Iterator}, the code that sees the {@code 
Iterator} loses the knowledge that it's
- * dealing with an Operator and might not close it.  Even something like a 
{@code CloseableIterator} is an
- * anti-pattern as it's possible to use it in a functional manner with code 
that loses track of the fact that it
- * must be closed.
- * 2. To avoid "fluent" style composition of functions on Operators.  It is 
important that there never be a set of
- * functional primitives for things like map/filter/reduce to "simplify" the 
implementation of Operators.  This is
- * because such fluency produces really hard to decipher stack traces as the 
stacktrace ends up being just a bunch
- * of calls from the scaffolding (map/filter/reduce) and not from the actual 
Operator itself.  By not implementing
- * {@code Iterator} we are actively increasing the burden of trying to add 
such functional operations to the point
- * that hopefully, though code review, we can ensure that we never develop 
them.  It is infinitely better to preserve
- * the stacktrace and "duplicate" the map/filter/reduce scaffolding code.
+ * The other benefit of the go() method is that it fully encapsulates the 
lifecycle of the underlying resources.
+ * This means that it should be possible to use try/finally blocks around 
calls to go() in order to ensure that
+ * resources are properly closed.
  */
 public interface Operator
 {
   /**
-   * Called to initiate the lifecycle of the Operator.  If an operator needs 
to checkout resources or anything to do
-   * its work, this is probably the place to do it.
+   * Tells the Operation to start doing its work.  Data will be pushed into 
the Receiver.
    *
-   * Work should *never* be done in this method, this method only exists to 
acquire resources that are known to be
-   * needed before doing any work.  As a litmus test, if there is ever a call 
to `op.next()` inside of this method,
-   * then something has been done wrong as that call to `.next()` is actually 
doing work.  Such code should be moved
-   * into being lazily evaluated as part of a call to `.next()`.
+   * @param receiver a receiver that will receive data
    */
-  void open();
+  void go(Receiver receiver);
 
-  /**
-   * Returns the next RowsAndColumns object that the Operator can produce.  
Behavior is undefined if
-   * {@link #hasNext} returns false.
-   *
-   * @return the next RowsAndColumns object that the operator can produce
-   */
-  RowsAndColumns next();
+  interface Receiver
+  {
+    /**
+     * Used to push data.  Return value indicates if more data will be 
accepted.  If false, push should not
+     * be called anymore.
+     *
+     * @param rac {@link RowsAndColumns} of data
+     * @return a boolean value indicating if more data will be accepted.  If 
false, push should never be called
+     * anymore
+     */
+    boolean push(RowsAndColumns rac);
 
-  /**
-   * Used to identify if it is safe to call {@link #next}
-   *
-   * @return true if it is safe to call {@link #next}
-   */
-  boolean hasNext();
-
-  /**
-   * Closes this Operator.  The cascade flag can be used to identify that the 
intent is to close this operator
-   * and only this operator without actually closing child operators.  Other 
databases us this sort of functionality
-   * with a planner that is watching over all of the objects and force-closes 
even if they were closed during normal
-   * operations.  In Druid, in the data pipeline where this was introduced, we 
are guaranteed to always have close
-   * called regardless of errors or exceptions during processing, as such, at 
time of introduction, there is no
-   * call that passes false for cascade.
-   * <p>
-   * That said, given that this is a common thing for these interfaces for 
other databases, we want to preserve the
-   * optionality of being able to leverage what they do.  As such, we define 
the method this way with the belief
-   * that it might be used in the future.  Semantically, this means that all 
implementations of Operators must
-   * expect to be closed multiple times.  I.e. after being closed, it is an 
error for open, next or hasNext to be
-   * called, but close can be called any number of times.
-   *
-   * @param cascade whether to call close on child operators.
-   */
-  void close(boolean cascade);
+    /**
+     * Used to indicate that no more data will ever come
+     */
+    void completed();
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
 
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
index 45a3bbd238..06fe9db8e7 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
@@ -22,12 +22,21 @@ package org.apache.druid.query.operator;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.guava.YieldingAccumulator;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
 import java.util.function.Supplier;
 
+/**
+ * Provides a sequence on top of Operators.  The mis-match in pull (Sequence) 
and push (Operator) means that, if we
+ * choose to support the Yielder interface, we have to use threading.  
Managing extra threads in order to do that
+ * is unfortunate, so, we choose to take a bit of a cop-out approach.
+ *
+ * Specifically, the accumulate method doesn't actually have the same problem 
and the query pipeline after the merge
+ * functions is composed of Sequences that all use accumulate instead of 
yielder.  Thus, if we are certain that
+ * we only use the OperatorSequence in places where toYielder is not called 
(i.e. it's only used as the return
+ * value of the merge() calls), then we can get away with only implementing 
the accumulate path.
+ */
 public class OperatorSequence implements Sequence<RowsAndColumns>
 {
   private final Supplier<Operator> opSupplier;
@@ -41,24 +50,13 @@ public class OperatorSequence implements 
Sequence<RowsAndColumns>
 
   @Override
   public <OutType> OutType accumulate(
-      OutType initValue,
+      final OutType initValue,
       Accumulator<OutType, RowsAndColumns> accumulator
   )
   {
-    Operator op = null;
-    try {
-      op = opSupplier.get();
-      op.open();
-      while (op.hasNext()) {
-        initValue = accumulator.accumulate(initValue, op.next());
-      }
-      return initValue;
-    }
-    finally {
-      if (op != null) {
-        op.close(true);
-      }
-    }
+    final MyReceiver<OutType> receiver = new MyReceiver<>(initValue, 
accumulator);
+    opSupplier.get().go(receiver);
+    return receiver.getRetVal();
   }
 
   @Override
@@ -67,59 +65,38 @@ public class OperatorSequence implements 
Sequence<RowsAndColumns>
       YieldingAccumulator<OutType, RowsAndColumns> accumulator
   )
   {
-    final Operator op = opSupplier.get();
-    try {
-      op.open();
-
-      while (!accumulator.yielded() && op.hasNext()) {
-        initValue = accumulator.accumulate(initValue, op.next());
-      }
-      if (accumulator.yielded()) {
-        OutType finalInitValue = initValue;
-        return new Yielder<OutType>()
-        {
-          private OutType retVal = finalInitValue;
-          private boolean done = false;
+    // As mentioned in the class-level javadoc, we skip this implementation 
and leave it up to the developer to
+    // only use this class in "safe" locations.
+    throw new UnsupportedOperationException("Cannot convert an Operator to a 
Yielder");
+  }
 
-          @Override
-          public OutType get()
-          {
-            return retVal;
-          }
+  private static class MyReceiver<OutType> implements Operator.Receiver
+  {
+    private final Accumulator<OutType, RowsAndColumns> accumulator;
+    private OutType retVal;
 
-          @Override
-          public Yielder<OutType> next(OutType initValue)
-          {
-            accumulator.reset();
-            retVal = initValue;
-            while (!accumulator.yielded() && op.hasNext()) {
-              retVal = accumulator.accumulate(retVal, op.next());
-            }
-            if (!accumulator.yielded()) {
-              done = true;
-            }
-            return this;
-          }
+    public MyReceiver(OutType initValue, Accumulator<OutType, RowsAndColumns> 
accumulator)
+    {
+      this.accumulator = accumulator;
+      retVal = initValue;
+    }
 
-          @Override
-          public boolean isDone()
-          {
-            return done;
-          }
+    public OutType getRetVal()
+    {
+      return retVal;
+    }
 
-          @Override
-          public void close()
-          {
-            op.close(true);
-          }
-        };
-      } else {
-        return Yielders.done(initValue, () -> op.close(true));
-      }
+    @Override
+    public boolean push(RowsAndColumns rac)
+    {
+      retVal = accumulator.accumulate(retVal, rac);
+      return true;
     }
-    catch (RuntimeException e) {
-      op.close(true);
-      throw e;
+
+    @Override
+    public void completed()
+    {
+
     }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
index dc912bfdd6..67a847d81d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
@@ -20,13 +20,16 @@
 package org.apache.druid.query.operator;
 
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RE;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.CloseableShapeshifter;
 import org.apache.druid.segment.Segment;
 
+import java.io.IOException;
+
 public class SegmentToRowsAndColumnsOperator implements Operator
 {
   private final Segment segment;
-  private boolean hasNext = true;
 
   public SegmentToRowsAndColumnsOperator(
       Segment segment
@@ -36,33 +39,22 @@ public class SegmentToRowsAndColumnsOperator implements 
Operator
   }
 
   @Override
-  public void open()
+  public void go(Receiver receiver)
   {
-
-  }
-
-  @Override
-  public RowsAndColumns next()
-  {
-    hasNext = false;
-
-    RowsAndColumns rac = segment.as(RowsAndColumns.class);
-    if (rac != null) {
-      return rac;
+    try (final CloseableShapeshifter shifty = 
segment.as(CloseableShapeshifter.class)) {
+      if (shifty == null) {
+        throw new ISE("Segment[%s] cannot shapeshift", segment.getClass());
+      }
+
+      RowsAndColumns rac = shifty.as(RowsAndColumns.class);
+      if (rac == null) {
+        throw new ISE("Cannot work with segment of type[%s]", 
segment.getClass());
+      }
+      receiver.push(rac);
+      receiver.completed();
+    }
+    catch (IOException e) {
+      throw new RE(e, "Problem closing resources for segment[%s]", 
segment.getId());
     }
-
-    throw new ISE("Cannot work with segment of type[%s]", segment.getClass());
-  }
-
-  @Override
-  public boolean hasNext()
-  {
-    return hasNext;
-  }
-
-  @Override
-  public void close(boolean cascade)
-  {
-
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
index 9dc54f9576..4014a10e6c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
@@ -19,21 +19,12 @@
 
 package org.apache.druid.query.operator;
 
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
-import java.io.IOException;
-import java.util.NoSuchElementException;
-
 public class SequenceOperator implements Operator
 {
   private final Sequence<RowsAndColumns> child;
-  private Yielder<RowsAndColumns> yielder;
-  private boolean closed = false;
 
   public SequenceOperator(
       Sequence<RowsAndColumns> child
@@ -43,45 +34,15 @@ public class SequenceOperator implements Operator
   }
 
   @Override
-  public void open()
-  {
-    if (closed) {
-      throw new ISE("Operator closed, cannot be re-opened");
-    }
-    yielder = Yielders.each(child);
-  }
-
-  @Override
-  public RowsAndColumns next()
-  {
-    if (closed) {
-      throw new NoSuchElementException();
-    }
-    final RowsAndColumns retVal = yielder.get();
-    yielder = yielder.next(null);
-    return retVal;
-  }
-
-  @Override
-  public boolean hasNext()
-  {
-    return !closed && !yielder.isDone();
-  }
-
-  @Override
-  public void close(boolean cascade)
+  public void go(Receiver receiver)
   {
-    if (closed) {
-      return;
-    }
-    try {
-      yielder.close();
-    }
-    catch (IOException e) {
-      throw new RE(e, "Exception when closing yielder from Sequence");
-    }
-    finally {
-      closed = true;
-    }
+    child.accumulate(
+        null,
+        (accumulated, in) -> {
+          receiver.push(in);
+          return accumulated;
+        }
+    );
+    receiver.completed();
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
 
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
index d95f598d39..02bf780f55 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
@@ -40,28 +40,21 @@ public class WindowProcessorOperator implements Operator
   }
 
   @Override
-  public void open()
+  public void go(Receiver receiver)
   {
-    child.open();
-  }
-
-  @Override
-  public RowsAndColumns next()
-  {
-    return windowProcessor.process(child.next());
-  }
-
-  @Override
-  public boolean hasNext()
-  {
-    return child.hasNext();
-  }
-
-  @Override
-  public void close(boolean cascade)
-  {
-    if (cascade) {
-      child.close(cascade);
-    }
+    child.go(new Receiver()
+    {
+      @Override
+      public boolean push(RowsAndColumns rac)
+      {
+        return receiver.push(windowProcessor.process(rac));
+      }
+
+      @Override
+      public void completed()
+      {
+        receiver.completed();
+      }
+    });
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java 
b/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java
index 97fe803416..0e597e2934 100644
--- a/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java
@@ -106,8 +106,8 @@ public class ArrayListSegment<RowType> implements Segment
   @SuppressWarnings("unchecked")
   public <T> T as(Class<T> clazz)
   {
-    if (RowsAndColumns.class.equals(clazz)) {
-      return (T) asRowsAndColumns();
+    if (CloseableShapeshifter.class.equals(clazz)) {
+      return (T) new MyCloseableShapeshifter();
     }
     return null;
   }
@@ -120,7 +120,27 @@ public class ArrayListSegment<RowType> implements Segment
 
   private RowsAndColumns asRowsAndColumns()
   {
-    return new ArrayListRowsAndColumns(rows, rowAdapter, rowSignature);
+    return new ArrayListRowsAndColumns<>(rows, rowAdapter, rowSignature);
   }
 
+  private class MyCloseableShapeshifter implements CloseableShapeshifter
+  {
+
+    @Override
+    public void close()
+    {
+
+    }
+
+    @Nullable
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T as(Class<T> clazz)
+    {
+      if (RowsAndColumns.class.equals(clazz)) {
+        return (T) asRowsAndColumns();
+      }
+      return null;
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java 
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
new file mode 100644
index 0000000000..f482cf04f4
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+/**
+ * A CloseableShapeshifter is an interface created to allow Segments to be 
used from {@link #as(Class)}, but also to
+ * be able to ensure that any resource used by the object returned from the 
{@link #as(Class)} method have proper
+ * management of their lifecycle.  This was initially introduced in order to 
make it possible for {@link Segment} to
+ * become a {@link org.apache.druid.query.rowsandcols.RowsAndColumns} without 
needing to add extra close() methods to
+ * {@link org.apache.druid.query.rowsandcols.RowsAndColumns}.
+ */
+public interface CloseableShapeshifter extends Closeable
+{
+  /**
+   * Asks the Object to return itself as a concrete implementation of a 
specific interface.  The interface
+   * asked for will tend to be a semantically-meaningful interface.
+   *
+   * @param clazz A class object representing the interface that the calling 
code wants a concrete implementation of
+   * @param <T>   The interface that the calling code wants a concrete 
implementation of
+   * @return A concrete implementation of the interface, or null if there is 
no meaningful optimization to be had
+   * through a local implementation of the interface.
+   */
+  @Nullable
+  <T> T as(Class<T> clazz);
+}
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
 
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
similarity index 57%
copy from 
processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
copy to 
processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
index dc912bfdd6..b4c52e1950 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
@@ -19,50 +19,19 @@
 
 package org.apache.druid.query.operator;
 
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
-import org.apache.druid.segment.Segment;
 
-public class SegmentToRowsAndColumnsOperator implements Operator
+public class ExceptionalReceiver implements Operator.Receiver
 {
-  private final Segment segment;
-  private boolean hasNext = true;
-
-  public SegmentToRowsAndColumnsOperator(
-      Segment segment
-  )
-  {
-    this.segment = segment;
-  }
-
-  @Override
-  public void open()
-  {
-
-  }
-
-  @Override
-  public RowsAndColumns next()
-  {
-    hasNext = false;
-
-    RowsAndColumns rac = segment.as(RowsAndColumns.class);
-    if (rac != null) {
-      return rac;
-    }
-
-    throw new ISE("Cannot work with segment of type[%s]", segment.getClass());
-  }
-
   @Override
-  public boolean hasNext()
+  public boolean push(RowsAndColumns rac)
   {
-    return hasNext;
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public void close(boolean cascade)
+  public void completed()
   {
-
+    throw new UnsupportedOperationException();
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
 
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
index dbe2adf830..1d7b68887b 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
@@ -49,25 +49,12 @@ public class InlineScanOperator implements Operator
   }
 
   @Override
-  public void open()
+  public void go(Receiver receiver)
   {
-  }
-
-  @Override
-  public RowsAndColumns next()
-  {
-    return iter.next();
-  }
-
-  @Override
-  public boolean hasNext()
-  {
-    return iter.hasNext();
-  }
-
-  @Override
-  public void close(boolean cascade)
-  {
-    iter = null;
+    boolean keepItGoing = true;
+    while (keepItGoing && iter.hasNext()) {
+      keepItGoing = receiver.push(iter.next());
+    }
+    receiver.completed();
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
index 218fde166b..6dbb7898b5 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
@@ -29,9 +29,6 @@ import 
org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Arrays;
-import java.util.List;
-
 public class NaivePartitioningOperatorTest
 {
   @Test
@@ -49,30 +46,49 @@ public class NaivePartitioningOperatorTest
         InlineScanOperator.make(rac)
     );
 
-    op.open();
+    new OperatorTestHelper()
+        .expectRowsAndColumns(
+            new RowsAndColumnsHelper()
+                .expectColumn("sorted", new int[]{0, 0, 0})
+                .expectColumn("unsorted", new int[]{3, 54, 21}),
+            new RowsAndColumnsHelper()
+                .expectColumn("sorted", new int[]{1, 1})
+                .expectColumn("unsorted", new int[]{1, 5}),
+            new RowsAndColumnsHelper()
+                .expectColumn("sorted", new int[]{2})
+                .expectColumn("unsorted", new int[]{54}),
+            new RowsAndColumnsHelper()
+                .expectColumn("sorted", new int[]{4, 4, 4})
+                .expectColumn("unsorted", new int[]{2, 3, 92})
+        )
+        .runToCompletion(op);
+  }
 
-    List<RowsAndColumnsHelper> expectations = Arrays.asList(
-        new RowsAndColumnsHelper()
-            .expectColumn("sorted", new int[]{0, 0, 0})
-            .expectColumn("unsorted", new int[]{3, 54, 21}),
-        new RowsAndColumnsHelper()
-            .expectColumn("sorted", new int[]{1, 1})
-            .expectColumn("unsorted", new int[]{1, 5}),
-        new RowsAndColumnsHelper()
-            .expectColumn("sorted", new int[]{2})
-            .expectColumn("unsorted", new int[]{54}),
-        new RowsAndColumnsHelper()
-            .expectColumn("sorted", new int[]{4, 4, 4})
-            .expectColumn("unsorted", new int[]{2, 3, 92})
+  @Test
+  public void testStopMidStream()
+  {
+    RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(
+        ImmutableMap.of(
+            "sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
+            "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2, 
3, 92})
+        )
     );
 
-    for (RowsAndColumnsHelper expectation : expectations) {
-      Assert.assertTrue(op.hasNext());
-      expectation.validate(op.next());
-    }
-    Assert.assertFalse(op.hasNext());
+    NaivePartitioningOperator op = new NaivePartitioningOperator(
+        ImmutableList.of("sorted"),
+        InlineScanOperator.make(rac)
+    );
 
-    op.close(true);
+    new OperatorTestHelper()
+        .expectAndStopAfter(
+            new RowsAndColumnsHelper()
+                .expectColumn("sorted", new int[]{0, 0, 0})
+                .expectColumn("unsorted", new int[]{3, 54, 21}),
+            new RowsAndColumnsHelper()
+                .expectColumn("sorted", new int[]{1, 1})
+                .expectColumn("unsorted", new int[]{1, 5})
+        )
+        .runToCompletion(op);
   }
 
   @Test
@@ -90,11 +106,17 @@ public class NaivePartitioningOperatorTest
         InlineScanOperator.make(rac)
     );
 
-    op.open();
 
     boolean exceptionThrown = false;
     try {
-      op.next();
+      new OperatorTestHelper()
+          .withPushFn(
+              rac1 -> {
+                Assert.fail("I shouldn't be called, an exception should've 
been thrown.");
+                return true;
+              }
+          )
+          .runToCompletion(op);
     }
     catch (ISE ex) {
       Assert.assertEquals("Pre-sorted data required, rows[1] and [2] were not 
in order", ex.getMessage());
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
index d4b0cdb128..875ef52bd3 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.operator;
 
 import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
 import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
@@ -30,27 +31,54 @@ import org.junit.Test;
 public class OperatorSequenceTest
 {
   @Test
-  public void testSanity()
+  public void testAccumulateButNoYielder()
   {
     OperatorSequence seq = new OperatorSequence(
         () -> InlineScanOperator.make(MapOfColumnsRowsAndColumns.of("hi", new 
IntArrayColumn(new int[]{1})))
     );
 
-    Assert.assertEquals(1, seq.accumulate(0, (accumulated, in) -> accumulated 
+ 1).intValue());
+    final RowsAndColumnsHelper helper = new RowsAndColumnsHelper()
+        .expectColumn("hi", new int[]{1})
+        .allColumnsRegistered();
 
-    Yielder<Integer> yielder = seq.toYielder(0, new 
YieldingAccumulator<Integer, RowsAndColumns>()
-    {
-      @Override
-      public Integer accumulate(Integer accumulated, RowsAndColumns in)
+    Assert.assertEquals(
+        1,
+        seq.accumulate(
+            0,
+            (accumulated, in) -> {
+              helper.validate(in);
+              return accumulated + 1;
+            }
+        ).intValue()
+    );
+
+    boolean exceptionThrown = false;
+    try {
+      Yielder<Integer> yielder = seq.toYielder(0, new 
YieldingAccumulator<Integer, RowsAndColumns>()
       {
-        yield();
-        return accumulated + 1;
-      }
-    });
-    Assert.assertFalse(yielder.isDone());
-    Assert.assertEquals(1, yielder.get().intValue());
-
-    yielder = yielder.next(0);
-    Assert.assertTrue(yielder.isDone());
+        @Override
+        public Integer accumulate(Integer accumulated, RowsAndColumns in)
+        {
+          Assert.fail("This should never be called, because we expect a UOE 
before this point");
+          this.yield();
+          helper.validate(in);
+          return accumulated + 1;
+        }
+      });
+
+      // The exception will have been thrown before this point, in which case 
one might wonder why the code here
+      // remains.  It is because this code is a correct validation of what 
should happen if OperatorSequence *did*
+      // implement the Yielder.  It's kept for posterity in case we ever 
choose to implement it using threads.
+      Assert.assertFalse(yielder.isDone());
+      Assert.assertEquals(1, yielder.get().intValue());
+
+      yielder = yielder.next(0);
+      Assert.assertTrue(yielder.isDone());
+    }
+    catch (UnsupportedOperationException ex) {
+      Assert.assertEquals("Cannot convert an Operator to a Yielder", 
ex.getMessage());
+      exceptionThrown = true;
+    }
+    Assert.assertTrue(exceptionThrown);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
new file mode 100644
index 0000000000..b1bace9aa3
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.junit.Assert;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class OperatorTestHelper
+{
+  private Supplier<TestReceiver> receiverSupply;
+  private Consumer<TestReceiver> finalValidation;
+
+  public OperatorTestHelper expectRowsAndColumns(RowsAndColumnsHelper... 
helpers)
+  {
+    return withPushFn(
+        new JustPushMe()
+        {
+          int index = 0;
+
+          @Override
+          public boolean push(RowsAndColumns rac)
+          {
+            helpers[index++].validate(rac);
+            return true;
+          }
+        }
+    ).withFinalValidation(
+        testReceiver -> Assert.assertEquals(helpers.length, 
testReceiver.getNumPushed())
+    );
+  }
+
+  public OperatorTestHelper expectAndStopAfter(RowsAndColumnsHelper... helpers)
+  {
+    return withPushFn(
+        new JustPushMe()
+        {
+          int index = 0;
+
+          @Override
+          public boolean push(RowsAndColumns rac)
+          {
+            helpers[index++].validate(rac);
+            return index < helpers.length;
+          }
+        }
+    ).withFinalValidation(
+        testReceiver -> Assert.assertEquals(helpers.length, 
testReceiver.getNumPushed())
+    );
+  }
+
+  public OperatorTestHelper withReceiver(Supplier<TestReceiver> receiver)
+  {
+    if (this.receiverSupply != null) {
+      throw new ISE("Receiver[%s] already set, cannot set it again[%s].", 
this.receiverSupply, receiver);
+    }
+    this.receiverSupply = receiver;
+    return this;
+  }
+
+  public OperatorTestHelper withFinalValidation(Consumer<TestReceiver> 
validator)
+  {
+    if (finalValidation == null) {
+      this.finalValidation = validator;
+    } else {
+      final Consumer<TestReceiver> subValidator = finalValidation;
+      this.finalValidation = (receiver) -> {
+        subValidator.accept(receiver);
+        validator.accept(receiver);
+      };
+    }
+    return this;
+  }
+
+  public OperatorTestHelper withPushFn(JustPushMe fn)
+  {
+    return withReceiver(() -> new TestReceiver(fn));
+  }
+
+  public OperatorTestHelper runToCompletion(Operator op)
+  {
+    TestReceiver receiver = this.receiverSupply.get();
+    op.go(receiver);
+    Assert.assertTrue(receiver.isCompleted());
+    if (finalValidation != null) {
+      finalValidation.accept(receiver);
+    }
+    return this;
+  }
+
+  public interface JustPushMe
+  {
+    boolean push(RowsAndColumns rac);
+  }
+
+  public static class TestReceiver implements Operator.Receiver
+  {
+    private final JustPushMe pushFn;
+
+    private AtomicLong numPushed = new AtomicLong();
+    private AtomicBoolean completed = new AtomicBoolean(false);
+
+    public TestReceiver(JustPushMe pushFn)
+    {
+      this.pushFn = pushFn;
+    }
+
+    @Override
+    public boolean push(RowsAndColumns rac)
+    {
+      numPushed.incrementAndGet();
+      return pushFn.push(rac);
+    }
+
+    public boolean isCompleted()
+    {
+      return completed.get();
+    }
+
+    @Override
+    public void completed()
+    {
+      if (!completed.compareAndSet(false, true)) {
+        throw new ISE("complete called more than once!?  Why.");
+      }
+    }
+
+    public long getNumPushed()
+    {
+      return numPushed.get();
+    }
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
new file mode 100644
index 0000000000..0cd1f1b588
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.apache.druid.segment.ArrayListSegment;
+import org.apache.druid.segment.CloseableShapeshifter;
+import org.apache.druid.segment.TestSegmentForAs;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SegmentToRowsAndColumnsOperatorTest
+{
+  @Test
+  public void testSanity()
+  {
+    ArrayList<Object[]> rows = Lists.newArrayList(
+        new Object[]{1, 2, "a"},
+        new Object[]{1, 2, "b"}
+    );
+
+    ArrayListSegment<Object[]> segment = new ArrayListSegment<>(
+        SegmentId.dummy("test"),
+        rows,
+        columnName -> objects -> objects[Integer.parseInt(columnName)],
+        RowSignature.builder()
+                    .add("0", ColumnType.LONG)
+                    .add("1", ColumnType.DOUBLE)
+                    .add("2", ColumnType.STRING).build()
+    );
+    final SegmentToRowsAndColumnsOperator op = new 
SegmentToRowsAndColumnsOperator(segment);
+
+    new OperatorTestHelper()
+        .expectRowsAndColumns(
+            new RowsAndColumnsHelper()
+                .expectColumn("0", new long[]{1, 1})
+                .expectColumn("1", new double[]{2, 2})
+                .expectColumn("2", ColumnType.STRING, "a", "b")
+                .allColumnsRegistered()
+        )
+        .runToCompletion(op);
+  }
+
+  @Test
+  public void testNotShapeshiftable()
+  {
+    SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
+        new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
+          Assert.assertEquals(CloseableShapeshifter.class, aClass);
+          return null;
+        })
+    );
+
+    boolean exceptionThrown = false;
+    try {
+      op.go(new ExceptionalReceiver());
+    }
+    catch (ISE e) {
+      Assert.assertEquals(e.getMessage(), "Segment[class 
org.apache.druid.segment.TestSegmentForAs] cannot shapeshift");
+      exceptionThrown = true;
+    }
+    Assert.assertTrue(exceptionThrown);
+  }
+
+  @Test
+  public void testCanShiftButNotToARAC()
+  {
+    AtomicBoolean closed = new AtomicBoolean(false);
+    SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
+        new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
+          Assert.assertEquals(CloseableShapeshifter.class, aClass);
+          return new CloseableShapeshifter()
+          {
+            @Nullable
+            @Override
+            public <T> T as(Class<T> clazz)
+            {
+              Assert.assertEquals(RowsAndColumns.class, clazz);
+              return null;
+            }
+
+            @Override
+            public void close()
+            {
+              closed.set(true);
+            }
+          };
+        })
+    );
+
+    boolean exceptionThrown = false;
+    try {
+      op.go(new ExceptionalReceiver());
+    }
+    catch (ISE e) {
+      Assert.assertEquals(
+          e.getMessage(),
+          "Cannot work with segment of type[class 
org.apache.druid.segment.TestSegmentForAs]"
+      );
+      exceptionThrown = true;
+    }
+    Assert.assertTrue(exceptionThrown);
+    Assert.assertTrue(closed.get());
+  }
+
+  @Test
+  public void testExceptionWhileClosing()
+  {
+    final MapOfColumnsRowsAndColumns expectedRac =
+        MapOfColumnsRowsAndColumns.of("0", new IntArrayColumn(new int[]{0, 
1}));
+    AtomicBoolean closed = new AtomicBoolean(false);
+
+    SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
+        new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
+          Assert.assertEquals(CloseableShapeshifter.class, aClass);
+          return new CloseableShapeshifter()
+          {
+            @SuppressWarnings("unchecked")
+            @Nullable
+            @Override
+            public <T> T as(Class<T> clazz)
+            {
+              Assert.assertEquals(RowsAndColumns.class, clazz);
+              return (T) expectedRac;
+            }
+
+            @Override
+            public void close() throws IOException
+            {
+              closed.set(true);
+              throw new IOException("ain't no thang");
+            }
+          };
+        })
+    );
+
+    boolean exceptionThrown = false;
+    try {
+      new OperatorTestHelper()
+          .withPushFn(rac -> {
+            Assert.assertSame(expectedRac, rac);
+            return true;
+          })
+          .runToCompletion(op);
+    }
+    catch (RE e) {
+      Assert.assertEquals(
+          e.getMessage(),
+          "Problem closing resources for 
segment[test_-146136543-09-08T08:23:32.096Z_146140482-04-24T15:36:27.903Z_dummy_version]"
+      );
+      exceptionThrown = true;
+    }
+    Assert.assertTrue(exceptionThrown);
+    Assert.assertTrue(closed.get());
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
index d980287972..02070c2436 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
@@ -38,19 +38,17 @@ public class SequenceOperatorTest
         MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new int[]{1}))
     )));
 
-    op.open();
-
-    RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
-        .expectColumn("hi", new int[]{1})
-        .allColumnsRegistered();
-
-    expectations.validate(op.next());
-    Assert.assertTrue(op.hasNext());
-
-    expectations.validate(op.next());
-    Assert.assertFalse(op.hasNext());
-
-    op.close(true);
-    op.close(false);
+    new OperatorTestHelper()
+        .withPushFn(
+            rac -> {
+              new RowsAndColumnsHelper()
+                  .expectColumn("hi", new int[]{1})
+                  .allColumnsRegistered()
+                  .validate(rac);
+              return true;
+            }
+        )
+        .withFinalValidation(testReceiver -> Assert.assertEquals(2, 
testReceiver.getNumPushed()))
+        .runToCompletion(op);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
index a6f1433201..73d3060b80 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
@@ -57,10 +57,13 @@ public class WindowProcessorOperatorTest
         InlineScanOperator.make(rac)
     );
 
-    op.open();
-    Assert.assertTrue(op.hasNext());
-    Assert.assertSame(rac, op.next());
-    Assert.assertFalse(op.hasNext());
-    op.close(true);
+    new OperatorTestHelper()
+        .withPushFn(
+            rowsAndColumns -> {
+              Assert.assertSame(rac, rowsAndColumns);
+              return true;
+            }
+        )
+        .runToCompletion(op);
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
index 7a5d24af03..515d800cb4 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
@@ -96,6 +96,13 @@ public class RowsAndColumnsHelper
     return this;
   }
 
+  public RowsAndColumnsHelper expectColumn(String col, ColumnType type, 
Object... expectedVals)
+  {
+    final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
+    helper.setExpectation(expectedVals);
+    return this;
+  }
+
   public RowsAndColumnsHelper expectColumn(String col, Object[] expectedVals, 
ColumnType type)
   {
     final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
index 43ab540065..706584a1c7 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
@@ -33,8 +33,6 @@ import 
org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-
 @SuppressWarnings("unchecked")
 public class WindowFramedAggregateProcessorTest
 {
@@ -52,9 +50,8 @@ public class WindowFramedAggregateProcessorTest
         "yay", new IntArrayColumn(new int[]{1, 2, 3})
     ));
 
-    final RowsAndColumns processed = proc.process(new 
AsOnlyTestRowsAndColumns(theFrame, theAggs, rac)
+    final RowsAndColumns processed = proc.process(new 
AsOnlyTestRowsAndColumns()
     {
-      @Nullable
       @Override
       public <T> T as(Class<T> clazz)
       {
diff --git 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
index 685fd6f864..035e06369b 100644
--- 
a/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
+++ 
b/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
@@ -20,29 +20,12 @@
 package org.apache.druid.query.rowsandcols;
 
 import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.operator.window.WindowFrame;
 import org.apache.druid.query.rowsandcols.column.Column;
 
 import java.util.Collection;
 
 public abstract class AsOnlyTestRowsAndColumns implements RowsAndColumns
 {
-  private final WindowFrame theFrame;
-  private final AggregatorFactory[] theAggs;
-  private final MapOfColumnsRowsAndColumns rac;
-
-  public AsOnlyTestRowsAndColumns(
-      WindowFrame theFrame,
-      AggregatorFactory[] theAggs,
-      MapOfColumnsRowsAndColumns rac
-  )
-  {
-    this.theFrame = theFrame;
-    this.theAggs = theAggs;
-    this.rac = rac;
-  }
-
   @Override
   public Collection<String> getColumnNames()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/TestSegmentForAs.java 
b/processing/src/test/java/org/apache/druid/segment/TestSegmentForAs.java
new file mode 100644
index 0000000000..2a910d0d42
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/TestSegmentForAs.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.function.Function;
+
+@SuppressWarnings("rawtypes")
+public class TestSegmentForAs implements Segment
+{
+  private final SegmentId id;
+  private final Function<Class, Object> asFn;
+
+  public TestSegmentForAs(
+      SegmentId id,
+      Function<Class, Object> asFn
+  )
+  {
+    this.id = id;
+    this.asFn = asFn;
+  }
+
+  @Override
+  public SegmentId getId()
+  {
+    return id;
+  }
+
+  @Override
+  public Interval getDataInterval()
+  {
+    return id.getInterval();
+  }
+
+  @Nullable
+  @Override
+  public QueryableIndex asQueryableIndex()
+  {
+    return as(QueryableIndex.class);
+  }
+
+  @Override
+  public StorageAdapter asStorageAdapter()
+  {
+    return as(StorageAdapter.class);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Nullable
+  @Override
+  public <T> T as(@Nonnull Class<T> clazz)
+  {
+    return (T) asFn.apply(clazz);
+  }
+
+  @Override
+  public void close()
+  {
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to