Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/1059#discussion_r155939394
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
---
@@ -228,4 +228,20 @@ public WritableBatch getWritableBatch() {
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
+
+ public void drainStream(IterOutcome stream, int input, RecordBatch
batch) {
+ if (stream == IterOutcome.OK_NEW_SCHEMA || stream == IterOutcome.OK) {
+ for (final VectorWrapper<?> wrapper : batch) {
+ wrapper.getValueVector().clear();
+ }
+ batch.kill(true);
+ stream = next(input, batch);
+ while (stream == IterOutcome.OK_NEW_SCHEMA || stream ==
IterOutcome.OK) {
+ for (final VectorWrapper<?> wrapper : batch) {
+ wrapper.getValueVector().clear();
+ }
+ stream = next(input, batch);
+ }
--- End diff --
Let's think a bit about this. Each fragment is synchronous and resides in a
single thread. The `kill()` call will tell the upstream batch that we don't
want any more batches. Under what conditions would the upstream operator ignore
our request and still send us more batches?
Given that the upstream batch is in the same thread, there is no race
condition issues. That is, the upstream can't be busy producing batches and
adding them to a queue. Why? It is in the same thread and the thread is
executing here.
If the upstream is a network receiver, then the network layer should handle
the race conditions so we don't expose those issues to the entire operator
stack.
Given all of this, I wonder, was this tested? Can it be tested? How can we
verify that the mechanism actually works other than trying it in production?
Any way to unit test this?
---