This is an automated email from the ASF dual-hosted git repository.
cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 313d937236 Switch operators to a push-style API (#13600)
313d937236 is described below
commit 313d937236efa242bc7b3ae144b50405a0499761
Author: imply-cheddar <[email protected]>
AuthorDate: Fri Dec 23 15:01:55 2022 +0900
Switch operators to a push-style API (#13600)
* Switch operators to a push-style API
This API generates nice stack-traces of processing
for Operators.
---
.../query/operator/NaivePartitioningOperator.java | 65 +++----
.../org/apache/druid/query/operator/Operator.java | 92 ++++------
.../druid/query/operator/OperatorSequence.java | 105 +++++-------
.../operator/SegmentToRowsAndColumnsOperator.java | 46 +++--
.../druid/query/operator/SequenceOperator.java | 57 +------
.../query/operator/WindowProcessorOperator.java | 37 ++--
.../org/apache/druid/segment/ArrayListSegment.java | 26 ++-
.../druid/segment/CloseableShapeshifter.java | 45 +++++
.../druid/query/operator/ExceptionalReceiver.java} | 41 +----
.../druid/query/operator/InlineScanOperator.java | 25 +--
.../operator/NaivePartitioningOperatorTest.java | 72 +++++---
.../druid/query/operator/OperatorSequenceTest.java | 58 +++++--
.../druid/query/operator/OperatorTestHelper.java | 156 +++++++++++++++++
.../SegmentToRowsAndColumnsOperatorTest.java | 187 +++++++++++++++++++++
.../druid/query/operator/SequenceOperatorTest.java | 26 ++-
.../operator/WindowProcessorOperatorTest.java | 13 +-
.../operator/window/RowsAndColumnsHelper.java | 7 +
.../window/WindowFramedAggregateProcessorTest.java | 5 +-
.../rowsandcols/AsOnlyTestRowsAndColumns.java | 17 --
.../org/apache/druid/segment/TestSegmentForAs.java | 82 +++++++++
20 files changed, 765 insertions(+), 397 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
index 15b4344b4f..71015a2c59 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/NaivePartitioningOperator.java
@@ -19,7 +19,6 @@
package org.apache.druid.query.operator;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import
org.apache.druid.query.rowsandcols.semantic.DefaultSortedGroupPartitioner;
import org.apache.druid.query.rowsandcols.semantic.SortedGroupPartitioner;
@@ -53,48 +52,34 @@ public class NaivePartitioningOperator implements Operator
}
@Override
- public void open()
+ public void go(Receiver receiver)
{
- child.open();
- }
-
- @Override
- public RowsAndColumns next()
- {
- if (partitionsIter != null && partitionsIter.hasNext()) {
- return partitionsIter.next();
- }
-
- if (child.hasNext()) {
- final RowsAndColumns rac = child.next();
-
- SortedGroupPartitioner groupPartitioner =
rac.as(SortedGroupPartitioner.class);
- if (groupPartitioner == null) {
- groupPartitioner = new DefaultSortedGroupPartitioner(rac);
- }
-
- partitionsIter =
groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
- return partitionsIter.next();
- }
+ child.go(
+ new Receiver()
+ {
+ @Override
+ public boolean push(RowsAndColumns rac)
+ {
+ SortedGroupPartitioner groupPartitioner =
rac.as(SortedGroupPartitioner.class);
+ if (groupPartitioner == null) {
+ groupPartitioner = new DefaultSortedGroupPartitioner(rac);
+ }
- throw new ISE("Asked for next when already complete");
- }
+ partitionsIter =
groupPartitioner.partitionOnBoundaries(partitionColumns).iterator();
- @Override
- public boolean hasNext()
- {
- if (partitionsIter != null && partitionsIter.hasNext()) {
- return true;
- }
+ boolean keepItGoing = true;
+ while (keepItGoing && partitionsIter.hasNext()) {
+ keepItGoing = receiver.push(partitionsIter.next());
+ }
+ return keepItGoing;
+ }
- return child.hasNext();
- }
-
- @Override
- public void close(boolean cascade)
- {
- if (cascade) {
- child.close(cascade);
- }
+ @Override
+ public void completed()
+ {
+ receiver.completed();
+ }
+ }
+ );
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/Operator.java
b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
index ad64cd1a52..41ba4e804e 100644
--- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java
+++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
@@ -22,72 +22,48 @@ package org.apache.druid.query.operator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
/**
- * An Operator interface that intends to align closely with the Operators that
other databases would also tend
- * to be implemented using.
+ * An Operator interface that intends to have implementations that align
relatively closely with the Operators that
+ * other databases would also tend to be implemented using. While a lot of
Operator interfaces tend to use a
+ * pull-based orientation, we use a push-based interface. This is to give us
good stacktraces. Because of the
+ * organization of the go() method, the stack traces thrown out of an Operator
will be
+ * 1. All of the go() calls from the top-level Operator down to the leaf
Operator, this part of the stacktrace gives
+ * visibility into what all of the actions that we expect to happen from the
operator chain are
+ * 2. All of the push() calls up until the exception happens, this part of the
stack trace gives us a view of all
+ * of the things that have happened to the data up until the exception was
thrown.
* <p>
- * The lifecycle of an operator is that, after creation, it should be opened,
and then iterated using hasNext() and
- * next(). Finally, when the Operator is no longer useful, it should be
closed.
+ * This "hour glass" structure of the stacktrace is by design. It is very
important that implementations of this
+ * interface never resort to a fluent style, inheritance or other code
structuring that removes the name of the active
+ * Operator from the stacktrace. It should always be possible to find ways to
avoid code duplication and still keep
+ * the Operator's name on the stacktrace.
* <p>
- * Operator's methods mimic the methods of an {@code Iterator}, but it does
not implement {@code Iterator}
- * intentionally. An operator should never be wrapped in an {@code Iterator}.
Any code that does that should be
- * considered a bug and fixed. This is for two reasons:
- * <p>
- * 1. An Operator should never be passed around as an {@code Iterator}. An
Operator must be closed, if an operator
- * gets returned as an {@code Iterator}, the code that sees the {@code
Iterator} loses the knowledge that it's
- * dealing with an Operator and might not close it. Even something like a
{@code CloseableIterator} is an
- * anti-pattern as it's possible to use it in a functional manner with code
that loses track of the fact that it
- * must be closed.
- * 2. To avoid "fluent" style composition of functions on Operators. It is
important that there never be a set of
- * functional primitives for things like map/filter/reduce to "simplify" the
implementation of Operators. This is
- * because such fluency produces really hard to decipher stack traces as the
stacktrace ends up being just a bunch
- * of calls from the scaffolding (map/filter/reduce) and not from the actual
Operator itself. By not implementing
- * {@code Iterator} we are actively increasing the burden of trying to add
such functional operations to the point
- * that hopefully, though code review, we can ensure that we never develop
them. It is infinitely better to preserve
- * the stacktrace and "duplicate" the map/filter/reduce scaffolding code.
+ * The other benefit of the go() method is that it fully encapsulates the
lifecycle of the underlying resources.
+ * This means that it should be possible to use try/finally blocks around
calls to go() in order to ensure that
+ * resources are properly closed.
*/
public interface Operator
{
/**
- * Called to initiate the lifecycle of the Operator. If an operator needs
to checkout resources or anything to do
- * its work, this is probably the place to do it.
+ * Tells the Operation to start doing its work. Data will be pushed into
the Receiver.
*
- * Work should *never* be done in this method, this method only exists to
acquire resources that are known to be
- * needed before doing any work. As a litmus test, if there is ever a call
to `op.next()` inside of this method,
- * then something has been done wrong as that call to `.next()` is actually
doing work. Such code should be moved
- * into being lazily evaluated as part of a call to `.next()`.
+ * @param receiver a receiver that will receive data
*/
- void open();
+ void go(Receiver receiver);
- /**
- * Returns the next RowsAndColumns object that the Operator can produce.
Behavior is undefined if
- * {@link #hasNext} returns false.
- *
- * @return the next RowsAndColumns object that the operator can produce
- */
- RowsAndColumns next();
+ interface Receiver
+ {
+ /**
+ * Used to push data. Return value indicates if more data will be
accepted. If false, push should not
+ * be called anymore.
+ *
+ * @param rac {@link RowsAndColumns} of data
+ * @return a boolean value indicating if more data will be accepted. If
false, push should never be called
+ * anymore
+ */
+ boolean push(RowsAndColumns rac);
- /**
- * Used to identify if it is safe to call {@link #next}
- *
- * @return true if it is safe to call {@link #next}
- */
- boolean hasNext();
-
- /**
- * Closes this Operator. The cascade flag can be used to identify that the
intent is to close this operator
- * and only this operator without actually closing child operators. Other
databases us this sort of functionality
- * with a planner that is watching over all of the objects and force-closes
even if they were closed during normal
- * operations. In Druid, in the data pipeline where this was introduced, we
are guaranteed to always have close
- * called regardless of errors or exceptions during processing, as such, at
time of introduction, there is no
- * call that passes false for cascade.
- * <p>
- * That said, given that this is a common thing for these interfaces for
other databases, we want to preserve the
- * optionality of being able to leverage what they do. As such, we define
the method this way with the belief
- * that it might be used in the future. Semantically, this means that all
implementations of Operators must
- * expect to be closed multiple times. I.e. after being closed, it is an
error for open, next or hasNext to be
- * called, but close can be called any number of times.
- *
- * @param cascade whether to call close on child operators.
- */
- void close(boolean cascade);
+ /**
+ * Used to indicate that no more data will ever come
+ */
+ void completed();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
index 45a3bbd238..06fe9db8e7 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/OperatorSequence.java
@@ -22,12 +22,21 @@ package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import java.util.function.Supplier;
+/**
+ * Provides a sequence on top of Operators. The mis-match in pull (Sequence)
and push (Operator) means that, if we
+ * choose to support the Yielder interface, we have to use threading.
Managing extra threads in order to do that
+ * is unfortunate, so, we choose to take a bit of a cop-out approach.
+ *
+ * Specifically, the accumulate method doesn't actually have the same problem
and the query pipeline after the merge
+ * functions is composed of Sequences that all use accumulate instead of
yielder. Thus, if we are certain that
+ * we only use the OperatorSequence in places where toYielder is not called
(i.e. it's only used as the return
+ * value of the merge() calls), then we can get away with only implementing
the accumulate path.
+ */
public class OperatorSequence implements Sequence<RowsAndColumns>
{
private final Supplier<Operator> opSupplier;
@@ -41,24 +50,13 @@ public class OperatorSequence implements
Sequence<RowsAndColumns>
@Override
public <OutType> OutType accumulate(
- OutType initValue,
+ final OutType initValue,
Accumulator<OutType, RowsAndColumns> accumulator
)
{
- Operator op = null;
- try {
- op = opSupplier.get();
- op.open();
- while (op.hasNext()) {
- initValue = accumulator.accumulate(initValue, op.next());
- }
- return initValue;
- }
- finally {
- if (op != null) {
- op.close(true);
- }
- }
+ final MyReceiver<OutType> receiver = new MyReceiver<>(initValue,
accumulator);
+ opSupplier.get().go(receiver);
+ return receiver.getRetVal();
}
@Override
@@ -67,59 +65,38 @@ public class OperatorSequence implements
Sequence<RowsAndColumns>
YieldingAccumulator<OutType, RowsAndColumns> accumulator
)
{
- final Operator op = opSupplier.get();
- try {
- op.open();
-
- while (!accumulator.yielded() && op.hasNext()) {
- initValue = accumulator.accumulate(initValue, op.next());
- }
- if (accumulator.yielded()) {
- OutType finalInitValue = initValue;
- return new Yielder<OutType>()
- {
- private OutType retVal = finalInitValue;
- private boolean done = false;
+ // As mentioned in the class-level javadoc, we skip this implementation
and leave it up to the developer to
+ // only use this class in "safe" locations.
+ throw new UnsupportedOperationException("Cannot convert an Operator to a
Yielder");
+ }
- @Override
- public OutType get()
- {
- return retVal;
- }
+ private static class MyReceiver<OutType> implements Operator.Receiver
+ {
+ private final Accumulator<OutType, RowsAndColumns> accumulator;
+ private OutType retVal;
- @Override
- public Yielder<OutType> next(OutType initValue)
- {
- accumulator.reset();
- retVal = initValue;
- while (!accumulator.yielded() && op.hasNext()) {
- retVal = accumulator.accumulate(retVal, op.next());
- }
- if (!accumulator.yielded()) {
- done = true;
- }
- return this;
- }
+ public MyReceiver(OutType initValue, Accumulator<OutType, RowsAndColumns>
accumulator)
+ {
+ this.accumulator = accumulator;
+ retVal = initValue;
+ }
- @Override
- public boolean isDone()
- {
- return done;
- }
+ public OutType getRetVal()
+ {
+ return retVal;
+ }
- @Override
- public void close()
- {
- op.close(true);
- }
- };
- } else {
- return Yielders.done(initValue, () -> op.close(true));
- }
+ @Override
+ public boolean push(RowsAndColumns rac)
+ {
+ retVal = accumulator.accumulate(retVal, rac);
+ return true;
}
- catch (RuntimeException e) {
- op.close(true);
- throw e;
+
+ @Override
+ public void completed()
+ {
+
}
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
index dc912bfdd6..67a847d81d 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
@@ -20,13 +20,16 @@
package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.CloseableShapeshifter;
import org.apache.druid.segment.Segment;
+import java.io.IOException;
+
public class SegmentToRowsAndColumnsOperator implements Operator
{
private final Segment segment;
- private boolean hasNext = true;
public SegmentToRowsAndColumnsOperator(
Segment segment
@@ -36,33 +39,22 @@ public class SegmentToRowsAndColumnsOperator implements
Operator
}
@Override
- public void open()
+ public void go(Receiver receiver)
{
-
- }
-
- @Override
- public RowsAndColumns next()
- {
- hasNext = false;
-
- RowsAndColumns rac = segment.as(RowsAndColumns.class);
- if (rac != null) {
- return rac;
+ try (final CloseableShapeshifter shifty =
segment.as(CloseableShapeshifter.class)) {
+ if (shifty == null) {
+ throw new ISE("Segment[%s] cannot shapeshift", segment.getClass());
+ }
+
+ RowsAndColumns rac = shifty.as(RowsAndColumns.class);
+ if (rac == null) {
+ throw new ISE("Cannot work with segment of type[%s]",
segment.getClass());
+ }
+ receiver.push(rac);
+ receiver.completed();
+ }
+ catch (IOException e) {
+ throw new RE(e, "Problem closing resources for segment[%s]",
segment.getId());
}
-
- throw new ISE("Cannot work with segment of type[%s]", segment.getClass());
- }
-
- @Override
- public boolean hasNext()
- {
- return hasNext;
- }
-
- @Override
- public void close(boolean cascade)
- {
-
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
index 9dc54f9576..4014a10e6c 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/SequenceOperator.java
@@ -19,21 +19,12 @@
package org.apache.druid.query.operator;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
-import java.io.IOException;
-import java.util.NoSuchElementException;
-
public class SequenceOperator implements Operator
{
private final Sequence<RowsAndColumns> child;
- private Yielder<RowsAndColumns> yielder;
- private boolean closed = false;
public SequenceOperator(
Sequence<RowsAndColumns> child
@@ -43,45 +34,15 @@ public class SequenceOperator implements Operator
}
@Override
- public void open()
- {
- if (closed) {
- throw new ISE("Operator closed, cannot be re-opened");
- }
- yielder = Yielders.each(child);
- }
-
- @Override
- public RowsAndColumns next()
- {
- if (closed) {
- throw new NoSuchElementException();
- }
- final RowsAndColumns retVal = yielder.get();
- yielder = yielder.next(null);
- return retVal;
- }
-
- @Override
- public boolean hasNext()
- {
- return !closed && !yielder.isDone();
- }
-
- @Override
- public void close(boolean cascade)
+ public void go(Receiver receiver)
{
- if (closed) {
- return;
- }
- try {
- yielder.close();
- }
- catch (IOException e) {
- throw new RE(e, "Exception when closing yielder from Sequence");
- }
- finally {
- closed = true;
- }
+ child.accumulate(
+ null,
+ (accumulated, in) -> {
+ receiver.push(in);
+ return accumulated;
+ }
+ );
+ receiver.completed();
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
index d95f598d39..02bf780f55 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
+++
b/processing/src/main/java/org/apache/druid/query/operator/WindowProcessorOperator.java
@@ -40,28 +40,21 @@ public class WindowProcessorOperator implements Operator
}
@Override
- public void open()
+ public void go(Receiver receiver)
{
- child.open();
- }
-
- @Override
- public RowsAndColumns next()
- {
- return windowProcessor.process(child.next());
- }
-
- @Override
- public boolean hasNext()
- {
- return child.hasNext();
- }
-
- @Override
- public void close(boolean cascade)
- {
- if (cascade) {
- child.close(cascade);
- }
+ child.go(new Receiver()
+ {
+ @Override
+ public boolean push(RowsAndColumns rac)
+ {
+ return receiver.push(windowProcessor.process(rac));
+ }
+
+ @Override
+ public void completed()
+ {
+ receiver.completed();
+ }
+ });
}
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java
b/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java
index 97fe803416..0e597e2934 100644
--- a/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java
+++ b/processing/src/main/java/org/apache/druid/segment/ArrayListSegment.java
@@ -106,8 +106,8 @@ public class ArrayListSegment<RowType> implements Segment
@SuppressWarnings("unchecked")
public <T> T as(Class<T> clazz)
{
- if (RowsAndColumns.class.equals(clazz)) {
- return (T) asRowsAndColumns();
+ if (CloseableShapeshifter.class.equals(clazz)) {
+ return (T) new MyCloseableShapeshifter();
}
return null;
}
@@ -120,7 +120,27 @@ public class ArrayListSegment<RowType> implements Segment
private RowsAndColumns asRowsAndColumns()
{
- return new ArrayListRowsAndColumns(rows, rowAdapter, rowSignature);
+ return new ArrayListRowsAndColumns<>(rows, rowAdapter, rowSignature);
}
+ private class MyCloseableShapeshifter implements CloseableShapeshifter
+ {
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Nullable
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> T as(Class<T> clazz)
+ {
+ if (RowsAndColumns.class.equals(clazz)) {
+ return (T) asRowsAndColumns();
+ }
+ return null;
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
new file mode 100644
index 0000000000..f482cf04f4
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/CloseableShapeshifter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+
+/**
+ * A CloseableShapeshifter is an interface created to allow Segments to be
used from {@link #as(Class)}, but also to
+ * be able to ensure that any resource used by the object returned from the
{@link #as(Class)} method have proper
+ * management of their lifecycle. This was initially introduced in order to
make it possible for {@link Segment} to
+ * become a {@link org.apache.druid.query.rowsandcols.RowsAndColumns} without
needing to add extra close() methods to
+ * {@link org.apache.druid.query.rowsandcols.RowsAndColumns}.
+ */
+public interface CloseableShapeshifter extends Closeable
+{
+ /**
+ * Asks the Object to return itself as a concrete implementation of a
specific interface. The interface
+ * asked for will tend to be a semantically-meaningful interface.
+ *
+ * @param clazz A class object representing the interface that the calling
code wants a concrete implementation of
+ * @param <T> The interface that the calling code wants a concrete
implementation of
+ * @return A concrete implementation of the interface, or null if there is
no meaningful optimization to be had
+ * through a local implementation of the interface.
+ */
+ @Nullable
+ <T> T as(Class<T> clazz);
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
similarity index 57%
copy from
processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
copy to
processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
index dc912bfdd6..b4c52e1950 100644
---
a/processing/src/main/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperator.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/ExceptionalReceiver.java
@@ -19,50 +19,19 @@
package org.apache.druid.query.operator;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
-import org.apache.druid.segment.Segment;
-public class SegmentToRowsAndColumnsOperator implements Operator
+public class ExceptionalReceiver implements Operator.Receiver
{
- private final Segment segment;
- private boolean hasNext = true;
-
- public SegmentToRowsAndColumnsOperator(
- Segment segment
- )
- {
- this.segment = segment;
- }
-
- @Override
- public void open()
- {
-
- }
-
- @Override
- public RowsAndColumns next()
- {
- hasNext = false;
-
- RowsAndColumns rac = segment.as(RowsAndColumns.class);
- if (rac != null) {
- return rac;
- }
-
- throw new ISE("Cannot work with segment of type[%s]", segment.getClass());
- }
-
@Override
- public boolean hasNext()
+ public boolean push(RowsAndColumns rac)
{
- return hasNext;
+ throw new UnsupportedOperationException();
}
@Override
- public void close(boolean cascade)
+ public void completed()
{
-
+ throw new UnsupportedOperationException();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
index dbe2adf830..1d7b68887b 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/InlineScanOperator.java
@@ -49,25 +49,12 @@ public class InlineScanOperator implements Operator
}
@Override
- public void open()
+ public void go(Receiver receiver)
{
- }
-
- @Override
- public RowsAndColumns next()
- {
- return iter.next();
- }
-
- @Override
- public boolean hasNext()
- {
- return iter.hasNext();
- }
-
- @Override
- public void close(boolean cascade)
- {
- iter = null;
+ boolean keepItGoing = true;
+ while (keepItGoing && iter.hasNext()) {
+ keepItGoing = receiver.push(iter.next());
+ }
+ receiver.completed();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
index 218fde166b..6dbb7898b5 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/NaivePartitioningOperatorTest.java
@@ -29,9 +29,6 @@ import
org.apache.druid.query.rowsandcols.column.IntArrayColumn;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.List;
-
public class NaivePartitioningOperatorTest
{
@Test
@@ -49,30 +46,49 @@ public class NaivePartitioningOperatorTest
InlineScanOperator.make(rac)
);
- op.open();
+ new OperatorTestHelper()
+ .expectRowsAndColumns(
+ new RowsAndColumnsHelper()
+ .expectColumn("sorted", new int[]{0, 0, 0})
+ .expectColumn("unsorted", new int[]{3, 54, 21}),
+ new RowsAndColumnsHelper()
+ .expectColumn("sorted", new int[]{1, 1})
+ .expectColumn("unsorted", new int[]{1, 5}),
+ new RowsAndColumnsHelper()
+ .expectColumn("sorted", new int[]{2})
+ .expectColumn("unsorted", new int[]{54}),
+ new RowsAndColumnsHelper()
+ .expectColumn("sorted", new int[]{4, 4, 4})
+ .expectColumn("unsorted", new int[]{2, 3, 92})
+ )
+ .runToCompletion(op);
+ }
- List<RowsAndColumnsHelper> expectations = Arrays.asList(
- new RowsAndColumnsHelper()
- .expectColumn("sorted", new int[]{0, 0, 0})
- .expectColumn("unsorted", new int[]{3, 54, 21}),
- new RowsAndColumnsHelper()
- .expectColumn("sorted", new int[]{1, 1})
- .expectColumn("unsorted", new int[]{1, 5}),
- new RowsAndColumnsHelper()
- .expectColumn("sorted", new int[]{2})
- .expectColumn("unsorted", new int[]{54}),
- new RowsAndColumnsHelper()
- .expectColumn("sorted", new int[]{4, 4, 4})
- .expectColumn("unsorted", new int[]{2, 3, 92})
+ @Test
+ public void testStopMidStream()
+ {
+ RowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(
+ ImmutableMap.of(
+ "sorted", new IntArrayColumn(new int[]{0, 0, 0, 1, 1, 2, 4, 4, 4}),
+ "unsorted", new IntArrayColumn(new int[]{3, 54, 21, 1, 5, 54, 2,
3, 92})
+ )
);
- for (RowsAndColumnsHelper expectation : expectations) {
- Assert.assertTrue(op.hasNext());
- expectation.validate(op.next());
- }
- Assert.assertFalse(op.hasNext());
+ NaivePartitioningOperator op = new NaivePartitioningOperator(
+ ImmutableList.of("sorted"),
+ InlineScanOperator.make(rac)
+ );
- op.close(true);
+ new OperatorTestHelper()
+ .expectAndStopAfter(
+ new RowsAndColumnsHelper()
+ .expectColumn("sorted", new int[]{0, 0, 0})
+ .expectColumn("unsorted", new int[]{3, 54, 21}),
+ new RowsAndColumnsHelper()
+ .expectColumn("sorted", new int[]{1, 1})
+ .expectColumn("unsorted", new int[]{1, 5})
+ )
+ .runToCompletion(op);
}
@Test
@@ -90,11 +106,17 @@ public class NaivePartitioningOperatorTest
InlineScanOperator.make(rac)
);
- op.open();
boolean exceptionThrown = false;
try {
- op.next();
+ new OperatorTestHelper()
+ .withPushFn(
+ rac1 -> {
+ Assert.fail("I shouldn't be called, an exception should've
been thrown.");
+ return true;
+ }
+ )
+ .runToCompletion(op);
}
catch (ISE ex) {
Assert.assertEquals("Pre-sorted data required, rows[1] and [2] were not
in order", ex.getMessage());
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
index d4b0cdb128..875ef52bd3 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/OperatorSequenceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.operator;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
@@ -30,27 +31,54 @@ import org.junit.Test;
public class OperatorSequenceTest
{
@Test
- public void testSanity()
+ public void testAccumulateButNoYielder()
{
OperatorSequence seq = new OperatorSequence(
() -> InlineScanOperator.make(MapOfColumnsRowsAndColumns.of("hi", new
IntArrayColumn(new int[]{1})))
);
- Assert.assertEquals(1, seq.accumulate(0, (accumulated, in) -> accumulated
+ 1).intValue());
+ final RowsAndColumnsHelper helper = new RowsAndColumnsHelper()
+ .expectColumn("hi", new int[]{1})
+ .allColumnsRegistered();
- Yielder<Integer> yielder = seq.toYielder(0, new
YieldingAccumulator<Integer, RowsAndColumns>()
- {
- @Override
- public Integer accumulate(Integer accumulated, RowsAndColumns in)
+ Assert.assertEquals(
+ 1,
+ seq.accumulate(
+ 0,
+ (accumulated, in) -> {
+ helper.validate(in);
+ return accumulated + 1;
+ }
+ ).intValue()
+ );
+
+ boolean exceptionThrown = false;
+ try {
+ Yielder<Integer> yielder = seq.toYielder(0, new
YieldingAccumulator<Integer, RowsAndColumns>()
{
- yield();
- return accumulated + 1;
- }
- });
- Assert.assertFalse(yielder.isDone());
- Assert.assertEquals(1, yielder.get().intValue());
-
- yielder = yielder.next(0);
- Assert.assertTrue(yielder.isDone());
+ @Override
+ public Integer accumulate(Integer accumulated, RowsAndColumns in)
+ {
+ Assert.fail("This should never be called, because we expect a UOE
before this point");
+ this.yield();
+ helper.validate(in);
+ return accumulated + 1;
+ }
+ });
+
+ // The exception will have been thrown before this point, in which case
one might wonder why the code here
+ // remains. It is because this code is a correct validation of what
should happen if OperatorSequence *did*
+ // implement the Yielder. It's kept for posterity in case we ever
choose to implement it using threads.
+ Assert.assertFalse(yielder.isDone());
+ Assert.assertEquals(1, yielder.get().intValue());
+
+ yielder = yielder.next(0);
+ Assert.assertTrue(yielder.isDone());
+ }
+ catch (UnsupportedOperationException ex) {
+ Assert.assertEquals("Cannot convert an Operator to a Yielder",
ex.getMessage());
+ exceptionThrown = true;
+ }
+ Assert.assertTrue(exceptionThrown);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
new file mode 100644
index 0000000000..b1bace9aa3
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/operator/OperatorTestHelper.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.junit.Assert;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class OperatorTestHelper
+{
+ private Supplier<TestReceiver> receiverSupply;
+ private Consumer<TestReceiver> finalValidation;
+
+ public OperatorTestHelper expectRowsAndColumns(RowsAndColumnsHelper...
helpers)
+ {
+ return withPushFn(
+ new JustPushMe()
+ {
+ int index = 0;
+
+ @Override
+ public boolean push(RowsAndColumns rac)
+ {
+ helpers[index++].validate(rac);
+ return true;
+ }
+ }
+ ).withFinalValidation(
+ testReceiver -> Assert.assertEquals(helpers.length,
testReceiver.getNumPushed())
+ );
+ }
+
+ public OperatorTestHelper expectAndStopAfter(RowsAndColumnsHelper... helpers)
+ {
+ return withPushFn(
+ new JustPushMe()
+ {
+ int index = 0;
+
+ @Override
+ public boolean push(RowsAndColumns rac)
+ {
+ helpers[index++].validate(rac);
+ return index < helpers.length;
+ }
+ }
+ ).withFinalValidation(
+ testReceiver -> Assert.assertEquals(helpers.length,
testReceiver.getNumPushed())
+ );
+ }
+
+ public OperatorTestHelper withReceiver(Supplier<TestReceiver> receiver)
+ {
+ if (this.receiverSupply != null) {
+ throw new ISE("Receiver[%s] already set, cannot set it again[%s].",
this.receiverSupply, receiver);
+ }
+ this.receiverSupply = receiver;
+ return this;
+ }
+
+ public OperatorTestHelper withFinalValidation(Consumer<TestReceiver>
validator)
+ {
+ if (finalValidation == null) {
+ this.finalValidation = validator;
+ } else {
+ final Consumer<TestReceiver> subValidator = finalValidation;
+ this.finalValidation = (receiver) -> {
+ subValidator.accept(receiver);
+ validator.accept(receiver);
+ };
+ }
+ return this;
+ }
+
+ public OperatorTestHelper withPushFn(JustPushMe fn)
+ {
+ return withReceiver(() -> new TestReceiver(fn));
+ }
+
+ public OperatorTestHelper runToCompletion(Operator op)
+ {
+ TestReceiver receiver = this.receiverSupply.get();
+ op.go(receiver);
+ Assert.assertTrue(receiver.isCompleted());
+ if (finalValidation != null) {
+ finalValidation.accept(receiver);
+ }
+ return this;
+ }
+
+ public interface JustPushMe
+ {
+ boolean push(RowsAndColumns rac);
+ }
+
+ public static class TestReceiver implements Operator.Receiver
+ {
+ private final JustPushMe pushFn;
+
+ private AtomicLong numPushed = new AtomicLong();
+ private AtomicBoolean completed = new AtomicBoolean(false);
+
+ public TestReceiver(JustPushMe pushFn)
+ {
+ this.pushFn = pushFn;
+ }
+
+ @Override
+ public boolean push(RowsAndColumns rac)
+ {
+ numPushed.incrementAndGet();
+ return pushFn.push(rac);
+ }
+
+ public boolean isCompleted()
+ {
+ return completed.get();
+ }
+
+ @Override
+ public void completed()
+ {
+ if (!completed.compareAndSet(false, true)) {
+ throw new ISE("complete called more than once!? Why.");
+ }
+ }
+
+ public long getNumPushed()
+ {
+ return numPushed.get();
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
new file mode 100644
index 0000000000..0cd1f1b588
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/query/operator/SegmentToRowsAndColumnsOperatorTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.operator;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.apache.druid.segment.ArrayListSegment;
+import org.apache.druid.segment.CloseableShapeshifter;
+import org.apache.druid.segment.TestSegmentForAs;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.timeline.SegmentId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SegmentToRowsAndColumnsOperatorTest
+{
+ @Test
+ public void testSanity()
+ {
+ ArrayList<Object[]> rows = Lists.newArrayList(
+ new Object[]{1, 2, "a"},
+ new Object[]{1, 2, "b"}
+ );
+
+ ArrayListSegment<Object[]> segment = new ArrayListSegment<>(
+ SegmentId.dummy("test"),
+ rows,
+ columnName -> objects -> objects[Integer.parseInt(columnName)],
+ RowSignature.builder()
+ .add("0", ColumnType.LONG)
+ .add("1", ColumnType.DOUBLE)
+ .add("2", ColumnType.STRING).build()
+ );
+ final SegmentToRowsAndColumnsOperator op = new
SegmentToRowsAndColumnsOperator(segment);
+
+ new OperatorTestHelper()
+ .expectRowsAndColumns(
+ new RowsAndColumnsHelper()
+ .expectColumn("0", new long[]{1, 1})
+ .expectColumn("1", new double[]{2, 2})
+ .expectColumn("2", ColumnType.STRING, "a", "b")
+ .allColumnsRegistered()
+ )
+ .runToCompletion(op);
+ }
+
+ @Test
+ public void testNotShapeshiftable()
+ {
+ SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
+ new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
+ Assert.assertEquals(CloseableShapeshifter.class, aClass);
+ return null;
+ })
+ );
+
+ boolean exceptionThrown = false;
+ try {
+ op.go(new ExceptionalReceiver());
+ }
+ catch (ISE e) {
+ Assert.assertEquals(e.getMessage(), "Segment[class
org.apache.druid.segment.TestSegmentForAs] cannot shapeshift");
+ exceptionThrown = true;
+ }
+ Assert.assertTrue(exceptionThrown);
+ }
+
+ @Test
+ public void testCanShiftButNotToARAC()
+ {
+ AtomicBoolean closed = new AtomicBoolean(false);
+ SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
+ new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
+ Assert.assertEquals(CloseableShapeshifter.class, aClass);
+ return new CloseableShapeshifter()
+ {
+ @Nullable
+ @Override
+ public <T> T as(Class<T> clazz)
+ {
+ Assert.assertEquals(RowsAndColumns.class, clazz);
+ return null;
+ }
+
+ @Override
+ public void close()
+ {
+ closed.set(true);
+ }
+ };
+ })
+ );
+
+ boolean exceptionThrown = false;
+ try {
+ op.go(new ExceptionalReceiver());
+ }
+ catch (ISE e) {
+ Assert.assertEquals(
+ e.getMessage(),
+ "Cannot work with segment of type[class
org.apache.druid.segment.TestSegmentForAs]"
+ );
+ exceptionThrown = true;
+ }
+ Assert.assertTrue(exceptionThrown);
+ Assert.assertTrue(closed.get());
+ }
+
+ @Test
+ public void testExceptionWhileClosing()
+ {
+ final MapOfColumnsRowsAndColumns expectedRac =
+ MapOfColumnsRowsAndColumns.of("0", new IntArrayColumn(new int[]{0,
1}));
+ AtomicBoolean closed = new AtomicBoolean(false);
+
+ SegmentToRowsAndColumnsOperator op = new SegmentToRowsAndColumnsOperator(
+ new TestSegmentForAs(SegmentId.dummy("test"), aClass -> {
+ Assert.assertEquals(CloseableShapeshifter.class, aClass);
+ return new CloseableShapeshifter()
+ {
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(Class<T> clazz)
+ {
+ Assert.assertEquals(RowsAndColumns.class, clazz);
+ return (T) expectedRac;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closed.set(true);
+ throw new IOException("ain't no thang");
+ }
+ };
+ })
+ );
+
+ boolean exceptionThrown = false;
+ try {
+ new OperatorTestHelper()
+ .withPushFn(rac -> {
+ Assert.assertSame(expectedRac, rac);
+ return true;
+ })
+ .runToCompletion(op);
+ }
+ catch (RE e) {
+ Assert.assertEquals(
+ e.getMessage(),
+ "Problem closing resources for
segment[test_-146136543-09-08T08:23:32.096Z_146140482-04-24T15:36:27.903Z_dummy_version]"
+ );
+ exceptionThrown = true;
+ }
+ Assert.assertTrue(exceptionThrown);
+ Assert.assertTrue(closed.get());
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
index d980287972..02070c2436 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/SequenceOperatorTest.java
@@ -38,19 +38,17 @@ public class SequenceOperatorTest
MapOfColumnsRowsAndColumns.of("hi", new IntArrayColumn(new int[]{1}))
)));
- op.open();
-
- RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
- .expectColumn("hi", new int[]{1})
- .allColumnsRegistered();
-
- expectations.validate(op.next());
- Assert.assertTrue(op.hasNext());
-
- expectations.validate(op.next());
- Assert.assertFalse(op.hasNext());
-
- op.close(true);
- op.close(false);
+ new OperatorTestHelper()
+ .withPushFn(
+ rac -> {
+ new RowsAndColumnsHelper()
+ .expectColumn("hi", new int[]{1})
+ .allColumnsRegistered()
+ .validate(rac);
+ return true;
+ }
+ )
+ .withFinalValidation(testReceiver -> Assert.assertEquals(2,
testReceiver.getNumPushed()))
+ .runToCompletion(op);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
index a6f1433201..73d3060b80 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
@@ -57,10 +57,13 @@ public class WindowProcessorOperatorTest
InlineScanOperator.make(rac)
);
- op.open();
- Assert.assertTrue(op.hasNext());
- Assert.assertSame(rac, op.next());
- Assert.assertFalse(op.hasNext());
- op.close(true);
+ new OperatorTestHelper()
+ .withPushFn(
+ rowsAndColumns -> {
+ Assert.assertSame(rac, rowsAndColumns);
+ return true;
+ }
+ )
+ .runToCompletion(op);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
index 7a5d24af03..515d800cb4 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/window/RowsAndColumnsHelper.java
@@ -96,6 +96,13 @@ public class RowsAndColumnsHelper
return this;
}
+ public RowsAndColumnsHelper expectColumn(String col, ColumnType type,
Object... expectedVals)
+ {
+ final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
+ helper.setExpectation(expectedVals);
+ return this;
+ }
+
public RowsAndColumnsHelper expectColumn(String col, Object[] expectedVals,
ColumnType type)
{
final ColumnHelper helper = columnHelper(col, expectedVals.length, type);
diff --git
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
index 43ab540065..706584a1c7 100644
---
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
+++
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
@@ -33,8 +33,6 @@ import
org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import org.junit.Assert;
import org.junit.Test;
-import javax.annotation.Nullable;
-
@SuppressWarnings("unchecked")
public class WindowFramedAggregateProcessorTest
{
@@ -52,9 +50,8 @@ public class WindowFramedAggregateProcessorTest
"yay", new IntArrayColumn(new int[]{1, 2, 3})
));
- final RowsAndColumns processed = proc.process(new
AsOnlyTestRowsAndColumns(theFrame, theAggs, rac)
+ final RowsAndColumns processed = proc.process(new
AsOnlyTestRowsAndColumns()
{
- @Nullable
@Override
public <T> T as(Class<T> clazz)
{
diff --git
a/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
b/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
index 685fd6f864..035e06369b 100644
---
a/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
+++
b/processing/src/test/java/org/apache/druid/query/rowsandcols/AsOnlyTestRowsAndColumns.java
@@ -20,29 +20,12 @@
package org.apache.druid.query.rowsandcols;
import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.operator.window.WindowFrame;
import org.apache.druid.query.rowsandcols.column.Column;
import java.util.Collection;
public abstract class AsOnlyTestRowsAndColumns implements RowsAndColumns
{
- private final WindowFrame theFrame;
- private final AggregatorFactory[] theAggs;
- private final MapOfColumnsRowsAndColumns rac;
-
- public AsOnlyTestRowsAndColumns(
- WindowFrame theFrame,
- AggregatorFactory[] theAggs,
- MapOfColumnsRowsAndColumns rac
- )
- {
- this.theFrame = theFrame;
- this.theAggs = theAggs;
- this.rac = rac;
- }
-
@Override
public Collection<String> getColumnNames()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/TestSegmentForAs.java
b/processing/src/test/java/org/apache/druid/segment/TestSegmentForAs.java
new file mode 100644
index 0000000000..2a910d0d42
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/TestSegmentForAs.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.function.Function;
+
+@SuppressWarnings("rawtypes")
+public class TestSegmentForAs implements Segment
+{
+ private final SegmentId id;
+ private final Function<Class, Object> asFn;
+
+ public TestSegmentForAs(
+ SegmentId id,
+ Function<Class, Object> asFn
+ )
+ {
+ this.id = id;
+ this.asFn = asFn;
+ }
+
+ @Override
+ public SegmentId getId()
+ {
+ return id;
+ }
+
+ @Override
+ public Interval getDataInterval()
+ {
+ return id.getInterval();
+ }
+
+ @Nullable
+ @Override
+ public QueryableIndex asQueryableIndex()
+ {
+ return as(QueryableIndex.class);
+ }
+
+ @Override
+ public StorageAdapter asStorageAdapter()
+ {
+ return as(StorageAdapter.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ @Override
+ public <T> T as(@Nonnull Class<T> clazz)
+ {
+ return (T) asFn.apply(clazz);
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]