This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new efb231d DRILL-8115: Adds LIMIT pushdown support in the scan framework
(#2441)
efb231d is described below
commit efb231ddb5a1f09e7a4431cab48e6328292d0e3f
Author: Paul Rogers <[email protected]>
AuthorDate: Tue Feb 1 01:54:24 2022 -0800
DRILL-8115: Adds LIMIT pushdown support in the scan framework (#2441)
* Adds LIMIT pushdown support in the scan framework
Modifies EVF and the scan framework to enforce LIMIT
pushdown across readers.
* Fix build issues
* Fixes from review comments
---
.../exec/physical/impl/scan/RowBatchReader.java | 62 ++--
.../physical/impl/scan/ScanOperatorEvents.java | 4 -
.../exec/physical/impl/scan/ScanOperatorExec.java | 24 +-
.../impl/scan/framework/ManagedScanFramework.java | 3 +
.../impl/scan/framework/SchemaNegotiator.java | 6 +
.../impl/scan/framework/SchemaNegotiatorImpl.java | 6 +
.../impl/scan/framework/ShimBatchReader.java | 27 +-
.../scan/project/ReaderSchemaOrchestrator.java | 27 +-
.../impl/scan/project/ScanSchemaOrchestrator.java | 29 +-
.../impl/scan/v3/ScanLifecycleBuilder.java | 17 ++
.../impl/scan/v3/lifecycle/ReaderLifecycle.java | 39 ++-
.../impl/scan/v3/lifecycle/ScanLifecycle.java | 18 +-
.../scan/v3/lifecycle/SchemaNegotiatorImpl.java | 5 +
.../exec/physical/resultSet/ResultSetLoader.java | 23 +-
.../exec/physical/resultSet/RowSetLoader.java | 1 +
.../resultSet/impl/ResultSetLoaderImpl.java | 72 ++++-
.../resultSet/impl/ResultSetOptionBuilder.java | 10 +
.../physical/resultSet/impl/RowSetLoaderImpl.java | 3 +
.../physical/resultSet/impl/WriterIndexImpl.java | 2 +-
.../exec/store/mock/ExtendedMockBatchReader.java | 13 +-
.../impl/scan/BaseScanOperatorExecTest.java | 7 +-
.../physical/impl/scan/TestScanOperExecBasics.java | 1 -
.../impl/scan/TestScanOperExecEarlySchema.java | 1 -
.../physical/impl/scan/TestScanOperExecLimit.java | 270 +++++++++++++++++
.../impl/scan/TestScanOperExecOverflow.java | 3 -
.../exec/physical/impl/scan/v3/TestScanLimit.java | 277 ++++++++++++++++++
.../scan/v3/lifecycle/BaseTestScanLifecycle.java | 2 +
.../scan/v3/lifecycle/TestScanLifecycleLimit.java | 321 +++++++++++++++++++++
.../resultSet/impl/TestResultSetLoaderLimits.java | 109 ++++++-
.../impl/TestResultSetLoaderProtocol.java | 7 +-
.../exec/vector/accessor/writer/WriterEvents.java | 27 +-
31 files changed, 1273 insertions(+), 143 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
index 61de584..b52b116 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.record.VectorContainer;
* vectors across batches, etc.
* <p>
* Note that this interface reads a <b>batch</b> of rows, not
- * a single row. (The original <tt>RecordReader</tt> could be
+ * a single row. (The original {@code RecordReader} could be
* confusing in this aspect.)
* <p>
* The expected lifecycle is:
@@ -49,15 +49,15 @@ import org.apache.drill.exec.record.VectorContainer;
* is late-schema), return null for the output and -1 for the schema
* version. Else, return non-negative for the version number and
* an empty batch with a schema.</li>
- * <li>{@link #next()}</tt> to retrieve the next record batch.
+ * <li>{@link #next()}} to retrieve the next record batch.
* Return true if a batch is available, false if EOF. There is no
- * requirement to return a batch; the first call to <tt>next()</tt>
- * can return <tt>false</tt> if no data is available.</tt>
+ * requirement to return a batch; the first call to {@code next()}
+ * can return {@code false} if no data is available.}
* <li>{@link #output()} and {@link #schemaVersion()} to obtain the
* batch of records read, and to detect if the version of the schema
* is different from the previous batch.</li>
* <li>{@link #close()} when the reader is no longer needed. This
- * may occur before <tt>next()</tt> returns <tt>false</tt> if an
+ * may occur before {@code next()} returns {@code false} if an
* error occurs or a limit is reached.</li>
* </ul>
* Although the reader should not care, the scanner must return an
@@ -65,8 +65,8 @@ import org.apache.drill.exec.record.VectorContainer;
* The scan operator handles this transparently:
* <ul>
* <li>If the reader is early-schema, then the reader itself returns
- * an empty batch (via <tt>output()</tt), with only schema after the call
- * to <tt>open()</tt>. The scanner sends this empty batch downstream
+ * an empty batch (via {@code output()</tt), with only schema after the call
+ * to {@code open()}. The scanner sends this empty batch downstream
* directly.</li>
* <li>If the reader is late-schema, then the reader will read the first
* batch from the reader. It will save that batch, extract the schema,
@@ -81,7 +81,6 @@ import org.apache.drill.exec.record.VectorContainer;
* from any method. A {@link UserException} is preferred to provide
* detailed information about the source of the problem.
*/
-
public interface RowBatchReader {
/**
@@ -89,12 +88,11 @@ public interface RowBatchReader {
*
* @return display name for errors
*/
-
String name();
/**
* Setup the record reader. Called just before the first call
- * to <tt>next()</tt>. Allocate resources here, not in the constructor.
+ * to {@code next()}. Allocate resources here, not in the constructor.
* Example: open files, allocate buffers, etc.
*
* @return true if the reader is open and ready to read (possibly no)
@@ -102,10 +100,9 @@ public interface RowBatchReader {
* but the scanner should not fail, it should move onto another reader
*
* @throws RuntimeException for "hard" errors that should terminate
- * the query. <tt>UserException</tt> preferred to explain the problem
+ * the query. {@code UserException} preferred to explain the problem
* better than the scan operator can by guessing at the cause
*/
-
boolean open();
/**
@@ -121,17 +118,16 @@ public interface RowBatchReader {
* @return true if this reader can (and has) defined an empty batch
* to describe the schema, false otherwise
*/
-
boolean defineSchema();
/**
* Read the next batch. Reading continues until either EOF,
* or until the mutator indicates that the batch is full.
* The batch is considered valid if it is non-empty. Returning
- * <tt>true</tt> with an empty batch is valid, and is helpful on
+ * {@code true} with an empty batch is valid, and is helpful on
* the very first batch (returning schema only.) An empty batch
- * with a <tt>false</tt> return code indicates EOF and the batch
- * will be discarded. A non-empty batch along with a <tt>false</tt>
+ * with a {@code false} return code indicates EOF and the batch
+ * will be discarded. A non-empty batch along with a {@code false}
* return result indicates a final, valid batch, but that EOF was
* reached and no more data is available.
* <p>
@@ -139,14 +135,13 @@ public interface RowBatchReader {
* final batch just to find out that no more data is available;
* it allows EOF to be returned along with the final batch.
*
- * @return <tt>true</tt> if more data may be available (and so
- * <tt>next()</tt> should be called again, <tt>false</tt> to indicate
+ * @return {@code true} if more data may be available (and so
+ * {@code next()} should be called again, {@code false} to indicate
* that EOF was reached
*
- * @throws RuntimeException (<tt>UserException</tt> preferred) if an
+ * @throws RuntimeException ({@code UserException} preferred) if an
* error occurs that should fail the query.
*/
-
boolean next();
/**
@@ -158,7 +153,7 @@ public interface RowBatchReader {
* empty batch with the schema set. The scanner will return this schema
* downstream to inform other operators of the schema.</li>
* <li>Directly after a successful call to {@link next()} to retrieve
- * the batch produced by that call. (No call is made if <tt>next()</tt>
+ * the batch produced by that call. (No call is made if {@code next()}
* returns false.</li>
* </ul>
* Note that most operators require the same vectors be present in
@@ -166,10 +161,9 @@ public interface RowBatchReader {
* container, and same set of vectors, on each call.
*
* @return a vector container, with the record count and schema
- * set, that announces the schema after <tt>open()</tt> (optional)
- * or returns rows read after <tt>next()</tt> (required)
+ * set, that announces the schema after {@code open()} (optional)
+ * or returns rows read after {@code next()} (required)
*/
-
VectorContainer output();
/**
@@ -183,8 +177,8 @@ public interface RowBatchReader {
* <li>The number increases if a batch has a different schema than the
* previous batch.</li>
* </ul>
- * Numbers increment (or not) on calls to <tt>next()</tt>. Thus Two
successive
- * calls to this method should return the same number if no <tt>next()</tt>
+ * Numbers increment (or not) on calls to {@code next()}. Thus Two successive
+ * calls to this method should return the same number if no {@code next()}
* call lies between.
* <p>
* If the reader can return a schema on open (so-called "early-schema), then
@@ -192,29 +186,27 @@ public interface RowBatchReader {
* happens to be empty (such as reading an empty file.)
* <p>
* However, if the reader cannot return a schema on open (so-called "late
- * schema"), then this method must return -1 (and <tt>output()</tt> must
+ * schema"), then this method must return -1 (and {@code output()} must
* return null) to indicate now schema is available when called before the
- * first call to <tt>next()</tt>.
+ * first call to {@code next()}.
* <p>
- * No calls will be made to this method before <tt>open()</tt> after
- * <tt>close()<tt> or after <tt>next()</tt> returns false. The implementation
+ * No calls will be made to this method before {@code open()} after
+ * {@code close(){@code or after {@code next()} returns false. The
implementation
* is thus not required to handle these cases.
*
* @return the schema version, or -1 if no schema version is yet available
*/
-
int schemaVersion();
/**
* Release resources. Called just after a failure, when the scanner
- * is cancelled, or after <tt>next()</tt> returns EOF. Release
+ * is cancelled, or after {@code next()} returns EOF. Release
* all resources and close files. Guaranteed to be called if
- * <tt>open()</tt> returns normally; will not be called if <tt>open()</tt>
+ * {@code open()} returns normally; will not be called if {@code open()}
* throws an exception.
*
- * @throws RutimeException (<tt>UserException</tt> preferred) if an
+ * @throws RutimeException ({@code UserException} preferred) if an
* error occurs that should fail the query.
*/
-
void close();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
index 04b2c7e..9aceb32 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.ops.OperatorContext;
* across scanners. See {@link ScanSchemaOrchestrator} for the managed
* framework.
*/
-
public interface ScanOperatorEvents {
/**
@@ -51,7 +50,6 @@ public interface ScanOperatorEvents {
*
* @param context the operator context for the scan operator
*/
-
void bind(OperatorContext context);
/**
@@ -67,13 +65,11 @@ public interface ScanOperatorEvents {
* @return a batch reader for one of the scan elements within the
* scan physical plan for this scan operator
*/
-
RowBatchReader nextReader();
/**
* Called when the scan operator itself is closed. Indicates that no more
* readers are available.
*/
-
void close();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index 699eb1f..e1038d5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -32,6 +32,24 @@ import org.slf4j.LoggerFactory;
* Implementation of the revised scan operator that uses a mutator aware of
* batch sizes. This is the successor to {@link ScanBatch} and should be used
* by all new scan implementations.
+ * <p>
+ * The basic concept is to split the scan operator into layers:
+ * <ul>
+ * <li>The {@code OperatorRecordBatch} which implements Drill's Volcano-like
+ * protocol.</li>
+ * <li>The scan operator "wrapper" (this class) which implements actions for
the
+ * operator record batch specifically for scan. It iterates over readers,
+ * delegating semantic work to other classes.</li>
+ * <li>The implementation of per-reader semantics in the two EVF versions and
+ * other ad-hoc implementations.</li>
+ * <li>The result set loader and related classes which pack values into
+ * value vectors.</li>
+ * <li>Value vectors, which store the data.</li>
+ * </ul>
+ * <p>
+ * The layered format can be confusing. However, each layer is somewhat
+ * complex, so dividing the work into layers keeps the overall complexity
+ * somewhat under control.
*
* <h4>Scanner Framework</h4>
*
@@ -116,13 +134,11 @@ public class ScanOperatorExec implements OperatorExec {
* The scan has been started, but next() has not yet been
* called.
*/
-
START,
/**
* A reader is active and has more batches to deliver.
*/
-
READER,
/**
@@ -131,14 +147,13 @@ public class ScanOperatorExec implements OperatorExec {
* was returned from next(). The next call to next() will
* be the last.
*/
-
EMPTY,
/**
* All readers are complete; no more batches to deliver.
+ * Or, hit the limit pushed down to the scan.
* close() is not yet called.
*/
-
END,
/**
@@ -146,7 +161,6 @@ public class ScanOperatorExec implements OperatorExec {
* states. No further calls to next() allowed. Waiting
* for the call to close().
*/
-
FAILED,
/**
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
index 860912c..dc456db 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
@@ -193,6 +193,9 @@ public class ManagedScanFramework implements
ScanOperatorEvents {
@Override
public RowBatchReader nextReader() {
+ if (scanOrchestrator.atLimit()) {
+ return null;
+ }
ManagedReader<? extends SchemaNegotiator> reader = readerFactory.next();
return reader == null ? null : new ShimBatchReader(this, reader);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index b2a793d..0a112b5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -165,6 +165,12 @@ public interface SchemaNegotiator {
void batchSize(int maxRecordsPerBatch);
/**
+ * Push down a LIMIT into the scan. This is a per-reader limit,
+ * not an overall scan limit.
+ */
+ void limit(long limit);
+
+ /**
* Build the schema, plan the required projections and static
* columns and return a loader used to populate value vectors.
* If the select list includes a subset of table columns, then
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index 653df53..5139ca7 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -64,6 +64,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator
{
protected TupleMetadata tableSchema;
protected boolean isSchemaComplete;
protected int batchSize = ValueVector.MAX_ROW_COUNT;
+ protected long limit = -1;
public SchemaNegotiatorImpl(ManagedScanFramework framework) {
this.framework = framework;
@@ -138,6 +139,11 @@ public class SchemaNegotiatorImpl implements
SchemaNegotiator {
return framework.builder.userName;
}
+ @Override
+ public void limit(long limit) {
+ this.limit = limit;
+ }
+
/**
* Callback from the schema negotiator to build the schema from information
from
* both the table and scan operator. Returns the result set loader to be used
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
index 3d750c6..487950c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
@@ -54,7 +54,8 @@ public class ShimBatchReader implements RowBatchReader,
NegotiatorListener {
*/
private boolean eof;
- public ShimBatchReader(ManagedScanFramework manager, ManagedReader<? extends
SchemaNegotiator> reader) {
+ public ShimBatchReader(ManagedScanFramework manager,
+ ManagedReader<? extends SchemaNegotiator> reader) {
this.framework = manager;
this.reader = reader;
readerOrchestrator = manager.scanOrchestrator().startReader();
@@ -72,7 +73,7 @@ public class ShimBatchReader implements RowBatchReader,
NegotiatorListener {
// Build and return the result set loader to be used by the reader.
- if (! framework.open(this)) {
+ if (!framework.open(this)) {
// If we had a soft failure, then there should be no schema.
// The reader should not have negotiated one. Not a huge
@@ -113,13 +114,21 @@ public class ShimBatchReader implements RowBatchReader,
NegotiatorListener {
// The reader may report EOF, but the result set loader might
// have a lookahead row.
- if (eof && ! tableLoader.hasRows()) {
+ if (eof && !tableLoader.hasRows()) {
+ return false;
+ }
+
+ // Hit the per-reader limit?
+ if (tableLoader.atLimit()) {
return false;
}
// Prepare for the batch.
- readerOrchestrator.startBatch();
+ if (!readerOrchestrator.startBatch()) {
+ eof = true;
+ return false;
+ }
// Read the batch. The reader should report EOF if it hits the
// end of data, even if the reader returns rows. This will prevent
allocating
@@ -127,8 +136,8 @@ public class ShimBatchReader implements RowBatchReader,
NegotiatorListener {
// already reported EOF. In that case, we're just processing any last
// lookahead row in the result set loader.
- if (! eof) {
- eof = ! reader.next();
+ if (!eof) {
+ eof = !reader.next();
}
// Add implicit columns, if any.
@@ -136,13 +145,13 @@ public class ShimBatchReader implements RowBatchReader,
NegotiatorListener {
// Having a correct row count, even if 0, is important to
// the scan operator.
- readerOrchestrator.endBatch(eof);
+ eof = readerOrchestrator.endBatch(eof);
// Return EOF (false) only when the reader reports EOF
// and the result set loader has drained its rows from either
// this batch or lookahead rows.
- return ! eof || tableLoader.hasRows();
+ return !eof || tableLoader.hasRows();
}
@Override
@@ -197,7 +206,7 @@ public class ShimBatchReader implements RowBatchReader,
NegotiatorListener {
this.schemaNegotiator = schemaNegotiator;
readerOrchestrator.setBatchSize(schemaNegotiator.batchSize);
tableLoader =
readerOrchestrator.makeTableLoader(schemaNegotiator.errorContext(),
- schemaNegotiator.tableSchema);
+ schemaNegotiator.tableSchema, schemaNegotiator.limit);
return tableLoader;
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
index 56fb481..6c9d3fe 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -42,6 +42,7 @@ import java.util.List;
public class ReaderSchemaOrchestrator implements VectorSource {
private final ScanSchemaOrchestrator scanOrchestrator;
+ private long limit;
private int readerBatchSize;
private ResultSetLoaderImpl tableLoader;
private int prevTableSchemaVersion = -1;
@@ -55,8 +56,10 @@ public class ReaderSchemaOrchestrator implements
VectorSource {
private ResolvedRow rootTuple;
private VectorContainer tableContainer;
- public ReaderSchemaOrchestrator(ScanSchemaOrchestrator
scanSchemaOrchestrator) {
+ public ReaderSchemaOrchestrator(
+ ScanSchemaOrchestrator scanSchemaOrchestrator, long limit) {
scanOrchestrator = scanSchemaOrchestrator;
+ this.limit = limit;
readerBatchSize = scanOrchestrator.options.scanBatchRecordLimit;
}
@@ -68,10 +71,11 @@ public class ReaderSchemaOrchestrator implements
VectorSource {
@VisibleForTesting
public ResultSetLoader makeTableLoader(TupleMetadata readerSchema) {
- return makeTableLoader(scanOrchestrator.scanProj.context(), readerSchema);
+ return makeTableLoader(scanOrchestrator.scanProj.context(), readerSchema,
-1);
}
- public ResultSetLoader makeTableLoader(CustomErrorContext errorContext,
TupleMetadata readerSchema) {
+ public ResultSetLoader makeTableLoader(CustomErrorContext errorContext,
+ TupleMetadata readerSchema, long localLimit) {
ResultSetOptionBuilder options = new ResultSetOptionBuilder();
options.rowCountLimit(Math.min(readerBatchSize,
scanOrchestrator.options.scanBatchRecordLimit));
options.vectorCache(scanOrchestrator.vectorCache);
@@ -86,6 +90,12 @@ public class ReaderSchemaOrchestrator implements
VectorSource {
// adds a column later.
options.projectionFilter(scanOrchestrator.scanProj.readerProjection);
options.readerSchema(readerSchema);
+ if (limit < 0) {
+ limit = localLimit;
+ } else if (localLimit >= 0) {
+ limit = Math.min(localLimit, limit);
+ }
+ options.limit(limit);
// Create the table loader
tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator,
options.build());
@@ -101,8 +111,8 @@ public class ReaderSchemaOrchestrator implements
VectorSource {
endBatch();
}
- public void startBatch() {
- tableLoader.startBatch();
+ public boolean startBatch() {
+ return tableLoader.startBatch();
}
/**
@@ -121,7 +131,7 @@ public class ReaderSchemaOrchestrator implements
VectorSource {
*
* @param eof is end of file
*/
- public void endBatch(boolean eof) {
+ public boolean endBatch(boolean eof) {
// Get the batch results in a container.
tableContainer = tableLoader.harvest();
@@ -140,7 +150,10 @@ public class ReaderSchemaOrchestrator implements
VectorSource {
if (projected) {
projectMetadata(false);
}
- rootTuple.setRowCount(tableContainer.getRecordCount());
+ int rowCount = tableContainer.getRecordCount();
+ rootTuple.setRowCount(rowCount);
+ scanOrchestrator.tallyBatch(rowCount);
+ return eof || tableLoader.atLimit();
}
/**
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index b1b3398..4f76132 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -208,6 +208,11 @@ public class ScanSchemaOrchestrator {
private CustomErrorContext errorContext;
/**
+ * Pushed-down scan LIMIT.
+ */
+ private long limit = -1;
+
+ /**
* Specify an optional metadata manager. Metadata is a set of constant
* columns with per-reader values. For file-based sources, this is usually
* the implicit and partition columns; but it could be other items for
other
@@ -291,6 +296,12 @@ public class ScanSchemaOrchestrator {
return providedSchema;
}
+ public void limit(long limit) {
+ this.limit = Math.max(-1, limit);
+ }
+
+ public long limit() { return limit; }
+
public void errorContext(CustomErrorContext context) {
this.errorContext = context;
}
@@ -301,8 +312,7 @@ public class ScanSchemaOrchestrator {
@VisibleForTesting
public ScanOperatorExec buildScan() {
- return new ScanOperatorExec(buildEvents(),
- ! disableEmptyResults);
+ return new ScanOperatorExec(buildEvents(), !disableEmptyResults);
}
public OperatorRecordBatch buildScanOperator(FragmentContext fragContext,
PhysicalOperator pop) {
@@ -335,6 +345,7 @@ public class ScanSchemaOrchestrator {
public final boolean useSchemaSmoothing;
public final boolean allowRequiredNullColumns;
public final TupleMetadata providedSchema;
+ public final long limit;
/**
* Context for error messages.
@@ -352,6 +363,7 @@ public class ScanSchemaOrchestrator {
context = builder.errorContext;
providedSchema = builder.providedSchema;
allowRequiredNullColumns = builder.allowRequiredNullColumns;
+ limit = builder.limit < 0 ? Long.MAX_VALUE : builder.limit;
}
protected TupleMetadata providedSchema() {
@@ -383,6 +395,8 @@ public class ScanSchemaOrchestrator {
protected final ScanLevelProjection scanProj;
private ReaderSchemaOrchestrator currentReader;
protected final SchemaSmoother schemaSmoother;
+ protected int batchCount;
+ protected long rowCount;
// Output
@@ -435,7 +449,7 @@ public class ScanSchemaOrchestrator {
public ReaderSchemaOrchestrator startReader() {
closeReader();
- currentReader = new ReaderSchemaOrchestrator(this);
+ currentReader = new ReaderSchemaOrchestrator(this, options.limit -
rowCount);
return currentReader;
}
@@ -458,6 +472,15 @@ public class ScanSchemaOrchestrator {
return outputContainer;
}
+ public void tallyBatch(int rowCount) {
+ this.batchCount++;
+ this.rowCount += rowCount;
+ }
+
+ public boolean atLimit() {
+ return batchCount > 0 && rowCount >= options.limit;
+ }
+
public void closeReader() {
if (currentReader != null) {
currentReader.close();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
index 393fc23..c3bd1f6 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
@@ -153,6 +153,11 @@ public class ScanLifecycleBuilder {
*/
protected CustomErrorContext errorContext;
+ /**
+ * Pushed-down scan LIMIT.
+ */
+ private long limit = Long.MAX_VALUE;
+
public void options(OptionSet options) {
this.options = options;
}
@@ -288,6 +293,18 @@ public class ScanLifecycleBuilder {
public SchemaValidator schemaValidator() { return schemaValidator; }
+ public void limit(long limit) {
+ // Operator definitions use -1 for "no limit", this mechanism
+ // uses a very big number for "no limit."
+ if (limit < 0) {
+ this.limit = Long.MAX_VALUE;
+ } else {
+ this.limit = limit;
+ }
+ }
+
+ public long limit() { return limit; }
+
public ScanLifecycle build(OperatorContext context) {
return new ScanLifecycle(context, this);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
index f322f51..4cb5ad8 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
@@ -94,9 +94,10 @@ import org.slf4j.LoggerFactory;
public class ReaderLifecycle implements RowBatchReader {
private static final Logger logger =
LoggerFactory.getLogger(ReaderLifecycle.class);
- private enum State { START, DATA, FINAL, EOF }
+ private enum State { START, DATA, FINAL, LIMIT, EOF }
private final ScanLifecycle scanLifecycle;
+ private final long limit;
protected final TupleMetadata readerInputSchema;
private ManagedReader reader;
private final SchemaNegotiatorImpl schemaNegotiator;
@@ -107,8 +108,9 @@ public class ReaderLifecycle implements RowBatchReader {
private OutputBatchBuilder outputBuilder;
private State state = State.START;
- public ReaderLifecycle(ScanLifecycle scanLifecycle) {
+ public ReaderLifecycle(ScanLifecycle scanLifecycle, long limit) {
this.scanLifecycle = scanLifecycle;
+ this.limit = limit;
this.readerInputSchema = schemaTracker().readerInputSchema();
this.schemaNegotiator = scanLifecycle.newNegotiator(this);
}
@@ -127,6 +129,9 @@ public class ReaderLifecycle implements RowBatchReader {
@Override
public String name() {
+ if (reader == null) {
+ return getClass().getSimpleName();
+ }
return reader.getClass().getSimpleName();
}
@@ -164,13 +169,15 @@ public class ReaderLifecycle implements RowBatchReader {
public ResultSetLoader buildLoader() {
Preconditions.checkState(state == State.START);
+ ScanLifecycleBuilder scanOptions = scanOptions();
ResultSetOptionBuilder options = new ResultSetOptionBuilder()
- .rowCountLimit(Math.min(schemaNegotiator.batchSize,
scanOptions().scanBatchRecordLimit()))
+ .rowCountLimit(Math.min(schemaNegotiator.batchSize,
scanOptions.scanBatchRecordLimit()))
.vectorCache(scanLifecycle.vectorCache())
- .batchSizeLimit(scanOptions().scanBatchByteLimit())
+ .batchSizeLimit(scanOptions.scanBatchByteLimit())
.errorContext(errorContext())
.projectionFilter(schemaTracker().projectionFilter(errorContext()))
- .readerSchema(schemaNegotiator.readerSchema);
+ .readerSchema(schemaNegotiator.readerSchema)
+ .limit(limit);
// Resolve the scan schema if possible.
applyEarlySchema();
@@ -214,8 +221,15 @@ public class ReaderLifecycle implements RowBatchReader {
// The reader may report EOF, but the result set loader might
// have a lookahead row.
- if (state == State.EOF) {
+ switch (state) {
+ case EOF:
return false;
+ case LIMIT:
+ outputBuilder = null;
+ state = State.EOF;
+ return false;
+ default:
+ break;
}
// Prepare for the batch.
@@ -226,10 +240,16 @@ public class ReaderLifecycle implements RowBatchReader {
// a new batch just to learn about EOF. Don't read if the reader
// already reported EOF. In that case, we're just processing any last
// lookahead row in the result set loader.
+ //
+ // If the scan has hit is pushed-down limit, then the reader might
+ // return EOF, or it might remain blissfully ignorant about why the
+ // batch was full. Double-check the limit here.
if (state == State.DATA) {
try {
if (!reader.next()) {
state = State.FINAL;
+ } else if (tableLoader.atLimit()) {
+ state = State.LIMIT;
}
} catch (UserException e) {
throw e;
@@ -282,8 +302,8 @@ public class ReaderLifecycle implements RowBatchReader {
if (tableLoader.batchCount() == 1 || prevTableSchemaVersion <
tableLoader.schemaVersion()) {
reviseOutputProjection(tableLoader.outputSchema());
}
- buildOutputBatch(readerOutput);
- scanLifecycle.tallyBatch();
+ int rowCount = buildOutputBatch(readerOutput);
+ scanLifecycle.tallyBatch(rowCount);
}
/**
@@ -337,7 +357,7 @@ public class ReaderLifecycle implements RowBatchReader {
.nullType(scanOptions().nullType());
}
- private void buildOutputBatch(VectorContainer readerContainer) {
+ private int buildOutputBatch(VectorContainer readerContainer) {
// Create the implicit columns loader loader after the first
// batch so we can report if the file is empty.
@@ -358,6 +378,7 @@ public class ReaderLifecycle implements RowBatchReader {
createOutputBuilder();
}
outputBuilder.load(rowCount);
+ return rowCount;
}
private void createOutputBuilder() {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
index 3261a05..b77f0cf 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
@@ -136,6 +136,7 @@ public class ScanLifecycle {
private final ScanSchemaTracker schemaTracker;
private final ReaderFactory<?> readerFactory;
private int batchCount;
+ private long rowCount;
/**
* Cache used to preserve the same vectors from one output batch to the
@@ -171,15 +172,24 @@ public class ScanLifecycle {
public boolean hasOutputSchema() { return schemaTracker.isResolved(); }
public CustomErrorContext errorContext() { return options.errorContext(); }
public BufferAllocator allocator() { return context.getAllocator(); }
- public void tallyBatch() { batchCount++; }
public int batchCount() { return batchCount; }
+ public long rowCount() { return rowCount; }
+
+ public void tallyBatch(int rowCount) {
+ batchCount++;
+ this.rowCount += rowCount;
+ }
public RowBatchReader nextReader() {
- if (readerFactory.hasNext()) {
- return new ReaderLifecycle(this);
- } else {
+ // Check limit. But, do at least one (zero row) batch
+ // to capture schema.
+ if (batchCount > 0 && rowCount >= options.limit()) {
+ return null;
+ }
+ if (!readerFactory.hasNext()) {
return null;
}
+ return new ReaderLifecycle(this, options.limit() - rowCount);
}
protected SchemaNegotiatorImpl newNegotiator(ReaderLifecycle
readerLifecycle) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
index 24ad466..535a758 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
@@ -53,6 +53,11 @@ import org.apache.drill.exec.vector.ValueVector;
* a vector written by the reader, in the second, it is a null column
* filled in by the scan projector (assuming, of course, that "c"
* is nullable or an array.)
+ * <p>
+ * Pushes limits into the result set loader. The caller must
+ * either check the return value from {#code startBatch()}, or
+ * call {@code atLimit()} after {@code harvest()} to detect when the scan
+ * has reached the limit. Treat the limit condition the same as EOF.
*/
public class SchemaNegotiatorImpl implements SchemaNegotiator {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
index cf44229..35c24b2 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
@@ -71,6 +71,12 @@ public interface ResultSetLoader {
int targetRowCount();
/**
+ * The maximum number of rows for the present batch. Will be the lesser
+ * of the {@link #targetRowCount()) and the overall scan limit remaining.
+ */
+ int maxBatchSize();
+
+ /**
* The largest vector size produced by this loader (as specified by
* the value vector limit.)
*
@@ -94,7 +100,7 @@ public interface ResultSetLoader {
* current batch.
* @return total row count
*/
- int totalRowCount();
+ long totalRowCount();
/**
* Report whether the loader currently holds rows. If within a batch,
@@ -109,8 +115,11 @@ public interface ResultSetLoader {
/**
* Start a new row batch. Valid only when first started, or after the
* previous batch has been harvested.
+ *
+ * @return {@code true} if another batch can be read, {@code false} if
+ * the reader has reached the given scan limit.
*/
- void startBatch();
+ boolean startBatch();
/**
* Writer for the top-level tuple (the entire row). Valid only when
@@ -229,6 +238,16 @@ public interface ResultSetLoader {
VectorContainer harvest();
/**
+ * After a {@link #harvest()}, call, call this method to determine if
+ * the scan limit has been hit. If so, treat this as the final batch
+ * for the reader, even if more data is available to read.
+ *
+ * @return {@code true} if the scan has reached a set scan row limit,
+ * {@code false} if there is no limit, or more rows can be read.
+ */
+ boolean atLimit();
+
+ /**
* The schema of the harvested batch. Valid until the start of the
* next batch.
*
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
index 462ad2d..f7d29df 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
@@ -114,6 +114,7 @@ public interface RowSetLoader extends TupleWriter {
* @param maxRecords Maximum rows to be returned. (From the limit clause of
the query)
* @return True if the row count exceeds the maxRecords, false if not.
*/
+ @Deprecated() // use the limit in options instead
boolean limitReached(int maxRecords);
/**
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index ac10a18..48e0f07 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -52,6 +52,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader,
LoaderInternals {
protected final ProjectionFilter projectionSet;
protected final TupleMetadata schema;
protected final long maxBatchSize;
+ protected final long scanLimit;
/**
* Context for error messages.
@@ -66,6 +67,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader,
LoaderInternals {
schema = null;
maxBatchSize = -1;
errorContext = null;
+ scanLimit = Long.MAX_VALUE;
}
public ResultSetOptions(ResultSetOptionBuilder builder) {
@@ -74,6 +76,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader,
LoaderInternals {
vectorCache = builder.vectorCache;
schema = builder.readerSchema;
maxBatchSize = builder.maxBatchSize;
+ scanLimit = builder.scanLimit;
errorContext = builder.errorContext == null
? EmptyErrorContext.INSTANCE : builder.errorContext;
if (builder.projectionFilter != null) {
@@ -126,7 +129,7 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
IN_OVERFLOW,
/**
- * Batch is full due to reaching the row count limit
+ * Batch is full due to reaching the row count or scan limit
* when saving a row.
* No more writes allowed until harvesting the current batch.
*/
@@ -162,6 +165,12 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
LOOK_AHEAD,
/**
+ * The loader has reached the given scan limit. No further batches
+ * can be started.
+ */
+ LIMITED,
+
+ /**
* Mutator is closed: no more operations are allowed.
*/
CLOSED
@@ -238,7 +247,7 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
* Counts the rows included in previously-harvested batches. Does not
* include the number of rows in the current batch.
*/
- private int previousRowCount;
+ private long previousRowCount;
/**
* Number of rows in the harvest batch. If an overflow batch is in effect,
@@ -257,6 +266,13 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
private int targetRowCount;
/**
+ * The number of rows allowed for the current batch. Is the lesser of the
+ * maximum batch size, the target row count, or the remaining margin on
+ * the scan limit.
+ */
+ private int batchSizeLimit;
+
+ /**
* Total bytes allocated to the current batch.
*/
protected int accumulatedBatchSize;
@@ -358,30 +374,30 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
case HARVESTED:
case LOOK_AHEAD:
case START:
+ case LIMITED:
// Batch is published. Use harvest schema.
return harvestSchemaVersion;
default:
- // Not really in a position to give a schema
- // version.
+ // Not really in a position to give a schema version.
throw new IllegalStateException("Unexpected state: " + state);
}
}
@Override
- public void startBatch() {
- startBatch(false);
+ public boolean startBatch() {
+ return startBatch(false);
}
/**
* Start a batch to report only schema without data.
*/
- public void startEmptyBatch() {
- startBatch(true);
+ public boolean startEmptyBatch() {
+ return startBatch(true);
}
- public void startBatch(boolean schemaOnly) {
+ public boolean startBatch(boolean schemaOnly) {
switch (state) {
case HARVESTED:
case START:
@@ -412,6 +428,9 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
// the writers.
break;
+ case LIMITED:
+ return false;
+
default:
throw new IllegalStateException("Unexpected state: " + state);
}
@@ -420,7 +439,9 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
// updates
harvestSchemaVersion = activeSchemaVersion;
pendingRowCount = 0;
+ batchSizeLimit = (int) Math.min(targetRowCount, options.scanLimit -
totalRowCount());
state = State.ACTIVE;
+ return true;
}
@Override
@@ -466,6 +487,8 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
harvestSchemaVersion = activeSchemaVersion;
rootWriter.startRow();
break;
+ case LIMITED:
+ throw new IllegalStateException("Attempt to write past the scan
limit.");
default:
throw new IllegalStateException("Unexpected state: " + state);
}
@@ -523,6 +546,8 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
case OVERFLOW:
case FULL_BATCH:
return true;
+ case LIMITED:
+ return true;
default:
return false;
}
@@ -530,7 +555,8 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
@Override
public boolean writeable() {
- return state == State.ACTIVE || state == State.OVERFLOW;
+ return (state == State.ACTIVE || state == State.OVERFLOW) &&
+ !atLimit();
}
private boolean isBatchActive() {
@@ -560,7 +586,7 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
@Override
public void setTargetRowCount(int rowCount) {
- targetRowCount = Math.max(1, rowCount);
+ targetRowCount = Math.min(Math.max(1, rowCount),
ValueVector.MAX_ROW_COUNT);
}
@Override
@@ -570,6 +596,9 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
public int targetVectorSize() { return options.vectorSizeLimit; }
@Override
+ public int maxBatchSize() { return batchSizeLimit; }
+
+ @Override
public int skipRows(int requestedCount) {
// Can only skip rows when a batch is active.
@@ -692,6 +721,10 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
harvestBatchCount++;
previousRowCount += rowCount;
+
+ if (atLimit()) {
+ state = State.LIMITED;
+ }
return container;
}
@@ -712,6 +745,19 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
}
@Override
+ public boolean atLimit() {
+ switch (state) {
+ case LIMITED:
+ return true;
+ case ACTIVE:
+ case HARVESTED:
+ return totalRowCount() >= options.scanLimit;
+ default:
+ return false;
+ }
+ }
+
+ @Override
public TupleMetadata outputSchema() {
return rootState.outputSchema();
}
@@ -739,8 +785,8 @@ public class ResultSetLoaderImpl implements
ResultSetLoader, LoaderInternals {
}
@Override
- public int totalRowCount() {
- int total = previousRowCount;
+ public long totalRowCount() {
+ long total = previousRowCount;
if (isBatchActive()) {
total += pendingRowCount + writerIndex.size();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
index a4e75b2..5a041a9 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
@@ -38,6 +38,7 @@ public class ResultSetOptionBuilder {
protected ProjectionFilter projectionFilter;
protected TupleMetadata readerSchema;
protected long maxBatchSize;
+ protected long scanLimit = Long.MAX_VALUE;
/**
* Error message context
@@ -114,6 +115,15 @@ public class ResultSetOptionBuilder {
return this;
}
+ public ResultSetOptionBuilder limit(long limit) {
+ if (limit < 0) {
+ this.scanLimit = Long.MAX_VALUE;
+ } else {
+ this.scanLimit = limit;
+ }
+ return this;
+ }
+
/**
* Provides context for error messages.
*/
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
index d907a59..d66798a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
@@ -74,6 +74,9 @@ public class RowSetLoaderImpl extends AbstractTupleWriter
implements RowSetLoade
@Override
public boolean start() {
+ if (rsLoader.atLimit()) {
+ return false;
+ }
if (rsLoader.isFull()) {
// Full batch? Return false.
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
index fe73cbb..dc59e0b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
@@ -89,7 +89,7 @@ class WriterIndexImpl implements ColumnWriterIndex {
return rowIndex;
}
- public boolean valid() { return rowIndex < rsLoader.targetRowCount(); }
+ public boolean valid() { return rowIndex < rsLoader.maxBatchSize(); }
@Override
public void rollover() {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
index 4edaddb..568c30c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
@@ -108,6 +108,7 @@ public class ExtendedMockBatchReader implements
ManagedReader<SchemaNegotiator>
if (batchSize > 0) {
schemaNegotiator.batchSize(batchSize);
}
+ schemaNegotiator.limit(config.getRecords());
loader = schemaNegotiator.build();
writer = loader.writer();
@@ -119,16 +120,8 @@ public class ExtendedMockBatchReader implements
ManagedReader<SchemaNegotiator>
@Override
public boolean next() {
- final int rowCount = config.getRecords() - loader.totalRowCount();
- if (rowCount <= 0) {
- return false;
- }
-
final Random rand = new Random();
- for (int i = 0; i < rowCount; i++) {
- if (writer.isFull()) {
- break;
- }
+ while (!writer.isFull()) {
writer.start();
for (int j = 0; j < fields.length; j++) {
if (fields[j].nullable && rand.nextInt(100) <
fields[j].nullablePercent) {
@@ -140,7 +133,7 @@ public class ExtendedMockBatchReader implements
ManagedReader<SchemaNegotiator>
writer.save();
}
- return true;
+ return !loader.atLimit();
}
@Override
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
index 5633943..f394bcf 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
@@ -35,6 +35,8 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.SubOperatorTest;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test of the scan operator framework. Here the focus is on the
@@ -46,9 +48,8 @@ import org.apache.drill.test.rowSet.RowSetUtilities;
* details of another, supporting class, then tests for that class
* appear elsewhere.
*/
-
public class BaseScanOperatorExecTest extends SubOperatorTest {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(BaseScanOperatorExecTest.class);
+ static final Logger logger =
LoggerFactory.getLogger(BaseScanOperatorExecTest.class);
/**
* Base class for the "mock" readers used in this test. The mock readers
@@ -57,7 +58,6 @@ public class BaseScanOperatorExecTest extends SubOperatorTest
{
* They also expose internal state such as identifying which methods
* were actually called.
*/
-
protected static abstract class BaseMockBatchReader implements
ManagedReader<SchemaNegotiator> {
protected boolean openCalled;
protected boolean closeCalled;
@@ -94,7 +94,6 @@ public class BaseScanOperatorExecTest extends SubOperatorTest
{
* Mock reader that pretends to have a schema at open time
* like an HBase or JDBC reader.
*/
-
protected static class MockEarlySchemaReader extends BaseMockBatchReader {
@Override
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
index 2566bcd..938f384 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
@@ -378,7 +378,6 @@ public class TestScanOperExecBasics extends
BaseScanOperatorExecTest {
* early schema. Results in an empty (rather than null)
* result set.
*/
-
@Test
public void testMultiEOFOnFirstBatch() {
MockEarlySchemaReader reader1 = new MockEarlySchemaReader();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
index 5c6fa80..b50394d 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
@@ -41,7 +41,6 @@ public class TestScanOperExecEarlySchema extends
BaseScanOperatorExecTest {
/**
* Mock reader that returns no schema and no records.
*/
-
private static class MockNullEarlySchemaReader extends BaseMockBatchReader {
@Override
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
new file mode 100644
index 0000000..b169f58
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+ /**
+ * Mock reader that returns two 50-row batches.
+ */
+ protected static class Mock50RowReader implements
ManagedReader<SchemaNegotiator> {
+ protected boolean openCalled;
+ protected ResultSetLoader tableLoader;
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+ openCalled = true;
+ negotiator.tableSchema(new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .build(), true);
+ tableLoader = negotiator.build();
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ if (tableLoader.batchCount() > 2) {
+ return false;
+ }
+ RowSetLoader rowSet = tableLoader.writer();
+ int base = tableLoader.batchCount() * 50 + 1;
+ for (int i = 0; i < 50; i++) {
+ if (rowSet.isFull()) {
+ break;
+ }
+ rowSet.addSingleCol(base + i);
+ }
+ return true;
+ }
+
+ @Override
+ public void close() { }
+ }
+
+ /**
+ * LIMIT 0, to obtain only the schema.
+ */
+ @Test
+ public void testLimit0() {
+ Mock50RowReader reader1 = new Mock50RowReader();
+ Mock50RowReader reader2 = new Mock50RowReader();
+
+ BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+ builder.builder.limit(0);
+ ScanFixture scanFixture = builder.build();
+ ScanOperatorExec scan = scanFixture.scanOp;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(0, batch.rowCount());
+ assertEquals(1, batch.schema().getFieldCount());
+ batch.release();
+
+ // No second batch or second reader
+ assertFalse(scan.next());
+
+ scanFixture.close();
+
+ assertTrue(reader1.openCalled);
+ assertFalse(reader2.openCalled);
+ }
+
+ /**
+ * LIMIT 1, simplest case
+ */
+ @Test
+ public void testLimit1() {
+ Mock50RowReader reader1 = new Mock50RowReader();
+ Mock50RowReader reader2 = new Mock50RowReader();
+
+ BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+ builder.builder.limit(1);
+ ScanFixture scanFixture = builder.build();
+ ScanOperatorExec scan = scanFixture.scanOp;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(1, batch.rowCount());
+ batch.release();
+
+ // No second batch or second reader
+ assertFalse(scan.next());
+
+ scanFixture.close();
+
+ assertTrue(reader1.openCalled);
+ assertFalse(reader2.openCalled);
+ }
+
+ /**
+ * LIMIT 50, same as batch size, to check boundary conditions.
+ */
+ @Test
+ public void testLimitOnBatchEnd() {
+ Mock50RowReader reader1 = new Mock50RowReader();
+ Mock50RowReader reader2 = new Mock50RowReader();
+
+ BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+ builder.builder.limit(50);
+ ScanFixture scanFixture = builder.build();
+ ScanOperatorExec scan = scanFixture.scanOp;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ // No second batch or second reader
+ assertFalse(scan.next());
+
+ scanFixture.close();
+
+ assertTrue(reader1.openCalled);
+ assertFalse(reader2.openCalled);
+ }
+
+ /**
+ * LIMIT 75, halfway through second batch.
+ */
+ @Test
+ public void testLimitOnScondBatch() {
+ Mock50RowReader reader1 = new Mock50RowReader();
+ Mock50RowReader reader2 = new Mock50RowReader();
+
+ BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+ builder.builder.limit(75);
+ ScanFixture scanFixture = builder.build();
+ ScanOperatorExec scan = scanFixture.scanOp;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(25, batch.rowCount());
+ batch.release();
+
+ // No second reader
+ assertFalse(scan.next());
+
+ scanFixture.close();
+
+ assertTrue(reader1.openCalled);
+ assertFalse(reader2.openCalled);
+ }
+
+ /**
+ * LIMIT 100, at EOF of the first reader.
+ */
+ @Test
+ public void testLimitOnEOF() {
+ Mock50RowReader reader1 = new Mock50RowReader();
+ Mock50RowReader reader2 = new Mock50RowReader();
+
+ BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+ builder.builder.limit(100);
+ ScanFixture scanFixture = builder.build();
+ ScanOperatorExec scan = scanFixture.scanOp;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ // No second reader
+ assertFalse(scan.next());
+
+ scanFixture.close();
+
+ assertTrue(reader1.openCalled);
+ assertFalse(reader2.openCalled);
+ }
+
+ /**
+ * LIMIT 125: full first reader, limit on second
+ */
+ @Test
+ public void testLimitOnSecondReader() {
+ Mock50RowReader reader1 = new Mock50RowReader();
+ Mock50RowReader reader2 = new Mock50RowReader();
+
+ BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+ builder.builder.limit(125);
+ ScanFixture scanFixture = builder.build();
+ ScanOperatorExec scan = scanFixture.scanOp;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ // First batch, second reader
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(25, batch.rowCount());
+ batch.release();
+
+ // No second batch
+ assertFalse(scan.next());
+
+ scanFixture.close();
+
+ assertTrue(reader1.openCalled);
+ assertTrue(reader2.openCalled);
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
index 225a3fc..21db742 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
@@ -44,7 +44,6 @@ public class TestScanOperExecOverflow extends
BaseScanOperatorExecTest {
* Mock reader that produces "jumbo" batches that cause a vector to
* fill and a row to overflow from one batch to the next.
*/
-
private static class OverflowReader extends BaseMockBatchReader {
private final String value;
@@ -111,7 +110,6 @@ public class TestScanOperExecOverflow extends
BaseScanOperatorExecTest {
* that overflow. Specifically, test a corner case. A batch ends right
* at file EOF, but that last batch overflowed.
*/
-
@Test
public void testMultipleReadersWithOverflow() {
runOverflowTest(false);
@@ -191,5 +189,4 @@ public class TestScanOperExecOverflow extends
BaseScanOperatorExecTest {
assertEquals(0, scan.batchAccessor().rowCount());
scanFixture.close();
}
-
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
new file mode 100644
index 0000000..19c7a84
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is from the outside: at the scan operator level.
+ */
+public class TestScanLimit extends BaseScanTest {
+
+ /**
+ * Mock reader that returns two 50-row batches.
+ */
+ protected static class Mock50RowReader implements ManagedReader {
+
+ private final ResultSetLoader tableLoader;
+
+ public Mock50RowReader(SchemaNegotiator negotiator) {
+ negotiator.tableSchema(new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .build());
+ tableLoader = negotiator.build();
+ }
+
+ @Override
+ public boolean next() {
+ if (tableLoader.batchCount() > 1) {
+ return false;
+ }
+ RowSetLoader rowSet = tableLoader.writer();
+ int base = tableLoader.batchCount() * 50 + 1;
+ for (int i = 0; i < 50; i++) {
+ if (rowSet.isFull()) {
+ break;
+ }
+ rowSet.addSingleCol(base + i);
+ }
+ return true;
+ }
+
+ @Override
+ public void close() { }
+ }
+
+ private static class TestFixture {
+ ObservableCreator creator1;
+ ObservableCreator creator2;
+ ScanFixture scanFixture;
+ ScanOperatorExec scan;
+
+ public TestFixture(long limit) {
+ creator1 = new ObservableCreator() {
+ @Override
+ public ManagedReader create(SchemaNegotiator negotiator) {
+ return new Mock50RowReader(negotiator);
+ }
+ };
+ creator2 = new ObservableCreator() {
+ @Override
+ public ManagedReader create(SchemaNegotiator negotiator) {
+ return new Mock50RowReader(negotiator);
+ }
+ };
+ BaseScanFixtureBuilder builder = simpleBuilder(creator1, creator2);
+ builder.builder.limit(limit);
+ scanFixture = builder.build();
+ scan = scanFixture.scanOp;
+ }
+
+ public void close() { scanFixture.close(); }
+
+ public int createCount() {
+ if (creator1.reader == null) {
+ return 0;
+ }
+ if (creator2.reader == null) {
+ return 1;
+ }
+ return 2;
+ }
+ }
+
+ /**
+ * LIMIT 0, to obtain only the schema.
+ */
+ @Test
+ public void testLimit0() {
+ TestFixture fixture = new TestFixture(0);
+ ScanOperatorExec scan = fixture.scan;
+
+ assertTrue(scan.buildSchema());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(0, batch.rowCount());
+ assertEquals(1, batch.schema().getFieldCount());
+ batch.release();
+
+ // No second batch or second reader
+ assertFalse(scan.next());
+
+ fixture.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, fixture.createCount());
+ }
+
+ /**
+ * LIMIT 1, simplest case
+ */
+ @Test
+ public void testLimit1() {
+ TestFixture fixture = new TestFixture(1);
+ ScanOperatorExec scan = fixture.scan;
+
+ // Reader builds schema, and stops after one row, though the reader
+ // itself is happy to provide more.
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(1, batch.rowCount());
+ batch.release();
+
+ // No second batch or second reader
+ assertFalse(scan.next());
+
+ fixture.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, fixture.createCount());
+ }
+
+ /**
+ * LIMIT 50, same as batch size, to check boundary conditions.
+ */
+ @Test
+ public void testLimitOnBatchEnd() {
+ TestFixture fixture = new TestFixture(50);
+ ScanOperatorExec scan = fixture.scan;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ // No second batch or second reader
+ assertFalse(scan.next());
+
+ fixture.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, fixture.createCount());
+ }
+
+ /**
+ * LIMIT 75, halfway through second batch.
+ */
+ @Test
+ public void testLimitOnSecondBatch() {
+ TestFixture fixture = new TestFixture(75);
+ ScanOperatorExec scan = fixture.scan;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(25, batch.rowCount());
+ batch.release();
+
+ // No second reader
+ assertFalse(scan.next());
+
+ fixture.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, fixture.createCount());
+ }
+
+ /**
+ * LIMIT 100, at EOF of the first reader.
+ */
+ @Test
+ public void testLimitOnEOF() {
+ TestFixture fixture = new TestFixture(100);
+ ScanOperatorExec scan = fixture.scan;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ // No second reader
+ assertFalse(scan.next());
+
+ fixture.close();
+ scan.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, fixture.createCount());
+ }
+
+ /**
+ * LIMIT 125: full first reader, limit on second
+ */
+ @Test
+ public void testLimitOnSecondReader() {
+ TestFixture fixture = new TestFixture(125);
+ ScanOperatorExec scan = fixture.scan;
+
+ assertTrue(scan.buildSchema());
+ assertTrue(scan.next());
+ BatchAccessor batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(50, batch.rowCount());
+ batch.release();
+
+ // First batch, second reader
+ assertTrue(scan.next());
+ batch = scan.batchAccessor();
+ assertEquals(25, batch.rowCount());
+ batch.release();
+
+ // No second batch
+ assertFalse(scan.next());
+
+ fixture.close();
+
+ // Both readers were created.
+ assertEquals(2, fixture.createCount());
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
index df1fa0c..3df94ff 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
@@ -89,6 +89,8 @@ public class BaseTestScanLifecycle extends SubOperatorTest {
public boolean hasNext() {
return counter < 2;
}
+
+ public int count() { return counter; }
}
public static final TupleMetadata SCHEMA = new SchemaBuilder()
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
new file mode 100644
index 0000000..bfde5ed
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is at the level of the scan framework, stripping away
+ * the outer scan operator level.
+ */
+public class TestScanLifecycleLimit extends BaseTestScanLifecycle {
+
+ /**
+ * Mock reader that returns two 50-row batches.
+ */
+ protected static class Mock50RowReader implements ManagedReader {
+
+ private final ResultSetLoader tableLoader;
+
+ public Mock50RowReader(SchemaNegotiator negotiator) {
+ negotiator.tableSchema(new SchemaBuilder()
+ .add("a", MinorType.INT)
+ .build());
+ tableLoader = negotiator.build();
+ }
+
+ @Override
+ public boolean next() {
+ if (tableLoader.batchCount() > 1) {
+ return false;
+ }
+ RowSetLoader rowSet = tableLoader.writer();
+ int base = tableLoader.batchCount() * 50 + 1;
+ for (int i = 0; i < 50; i++) {
+ if (rowSet.isFull()) {
+ break;
+ }
+ rowSet.addSingleCol(base + i);
+ }
+ return true;
+ }
+
+ @Override
+ public void close() { }
+ }
+
+ private Pair<TwoReaderFactory, ScanLifecycle> setupScan(long limit) {
+ ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
+ builder.projection(RowSetTestUtils.projectList("a"));
+ TwoReaderFactory factory = new TwoReaderFactory() {
+ @Override
+ public ManagedReader firstReader(SchemaNegotiator negotiator) {
+ return new Mock50RowReader(negotiator);
+ }
+
+ @Override
+ public ManagedReader secondReader(SchemaNegotiator negotiator) {
+ return new Mock50RowReader(negotiator);
+ }
+ };
+ builder.readerFactory(factory);
+
+ builder.limit(limit);
+ return Pair.of(factory, buildScan(builder));
+ }
+
+ /**
+ * LIMIT 0, to obtain only the schema.
+ */
+ @Test
+ public void testLimit0() {
+ Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(0);
+ TwoReaderFactory factory = pair.getLeft();
+ ScanLifecycle scan = pair.getRight();
+
+ // Reader builds schema, but returns no data, though the reader
+ // itself is happy to provide data.
+ RowBatchReader reader = scan.nextReader();
+ assertTrue(reader.open());
+ assertTrue(reader.next());
+ RowSet result = fixture.wrap(reader.output());
+ assertEquals(0, result.rowCount());
+ assertEquals(1, result.schema().size());
+ result.clear();
+
+ // No second batch
+ assertFalse(reader.next());
+ reader.close();
+
+ // No next reader, despite there being two, since we hit the limit.
+ assertNull(scan.nextReader());
+
+ scan.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, factory.count());
+ }
+
+ /**
+ * LIMIT 1, simplest case
+ */
+ @Test
+ public void testLimit1() {
+ Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(1);
+ TwoReaderFactory factory = pair.getLeft();
+ ScanLifecycle scan = pair.getRight();
+
+ // Reader builds schema, and stops after one row, though the reader
+ // itself is happy to provide more.
+ RowBatchReader reader = scan.nextReader();
+ assertTrue(reader.open());
+ assertTrue(reader.next());
+ RowSet result = fixture.wrap(reader.output());
+ assertEquals(1, result.rowCount());
+ assertEquals(1, result.schema().size());
+ result.clear();
+
+ // No second batch
+ assertFalse(reader.next());
+ reader.close();
+
+ // No next reader, despite there being two, since we hit the limit.
+ assertNull(scan.nextReader());
+
+ scan.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, factory.count());
+ }
+
+ /**
+ * LIMIT 50, same as batch size, to check boundary conditions.
+ */
+ @Test
+ public void testLimitOnBatchEnd() {
+ Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(50);
+ TwoReaderFactory factory = pair.getLeft();
+ ScanLifecycle scan = pair.getRight();
+
+ RowBatchReader reader = scan.nextReader();
+ assertTrue(reader.open());
+ assertTrue(reader.next());
+ RowSet result = fixture.wrap(reader.output());
+ assertEquals(50, result.rowCount());
+ result.clear();
+
+ // No second batch
+ assertFalse(reader.next());
+ reader.close();
+
+ // No next reader, despite there being two, since we hit the limit.
+ assertNull(scan.nextReader());
+
+ scan.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, factory.count());
+ }
+
+ /**
+ * LIMIT 75, halfway through second batch.
+ */
+ @Test
+ public void testLimitOnSecondBatch() {
+ Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(75);
+ TwoReaderFactory factory = pair.getLeft();
+ ScanLifecycle scan = pair.getRight();
+
+ RowBatchReader reader = scan.nextReader();
+ assertTrue(reader.open());
+
+ // First batch
+ assertTrue(reader.next());
+ RowSet result = fixture.wrap(reader.output());
+ assertEquals(50, result.rowCount());
+ result.clear();
+
+ // Second batch
+ assertTrue(reader.next());
+ result = fixture.wrap(reader.output());
+ assertEquals(25, result.rowCount());
+ result.clear();
+
+ // No third batch
+ assertFalse(reader.next());
+ reader.close();
+
+ // No next reader, despite there being two, since we hit the limit.
+ assertNull(scan.nextReader());
+
+ scan.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, factory.count());
+ }
+
+ /**
+ * LIMIT 100, at EOF of the first reader.
+ */
+ @Test
+ public void testLimitOnEOF() {
+ Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(100);
+ TwoReaderFactory factory = pair.getLeft();
+ ScanLifecycle scan = pair.getRight();
+
+ RowBatchReader reader = scan.nextReader();
+ assertTrue(reader.open());
+
+ // First batch
+ assertTrue(reader.next());
+ RowSet result = fixture.wrap(reader.output());
+ assertEquals(50, result.rowCount());
+ result.clear();
+
+ // Second batch
+ assertTrue(reader.next());
+ result = fixture.wrap(reader.output());
+ assertEquals(50, result.rowCount());
+ result.clear();
+
+ // No third batch
+ assertFalse(reader.next());
+ reader.close();
+
+ // No next reader, despite there being two, since we hit the limit.
+ assertNull(scan.nextReader());
+
+ scan.close();
+
+ // Only the first of the two readers were created.
+ assertEquals(1, factory.count());
+ }
+
+ /**
+ * LIMIT 125: full first reader, limit on second
+ */
+ @Test
+ public void testLimitOnSecondReader() {
+ Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(125);
+ TwoReaderFactory factory = pair.getLeft();
+ ScanLifecycle scan = pair.getRight();
+
+ RowBatchReader reader = scan.nextReader();
+ assertTrue(reader.open());
+
+ // First batch
+ assertTrue(reader.next());
+ RowSet result = fixture.wrap(reader.output());
+ assertEquals(50, result.rowCount());
+ result.clear();
+
+ // Second batch
+ assertTrue(reader.next());
+ result = fixture.wrap(reader.output());
+ assertEquals(50, result.rowCount());
+ result.clear();
+
+ // No third batch
+ assertFalse(reader.next());
+ reader.close();
+
+ // Move to second reader.
+ reader = scan.nextReader();
+ assertNotNull(reader);
+ assertTrue(reader.open());
+
+ // First is limited
+ assertTrue(reader.next());
+ result = fixture.wrap(reader.output());
+ assertEquals(25, result.rowCount());
+ result.clear();
+
+ // No second batch
+ assertFalse(reader.next());
+ reader.close();
+
+ scan.close();
+
+ // Both readers were created.
+ assertEquals(2, factory.count());
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
index 2104326..49ff638 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.resultSet.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Arrays;
@@ -29,7 +30,9 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import
org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.test.SubOperatorTest;
import org.junit.Test;
@@ -46,7 +49,6 @@ import org.junit.experimental.categories.Category;
* in fact, depend on the row count) and vector overflow (which an occur when
* the row limit turns out to be too large.)
*/
-
@Category(RowSetTests.class)
public class TestResultSetLoaderLimits extends SubOperatorTest {
@@ -54,7 +56,6 @@ public class TestResultSetLoaderLimits extends
SubOperatorTest {
* Verify that the writer stops when reaching the row limit.
* In this case there is no look-ahead row.
*/
-
@Test
public void testRowLimit() {
ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
@@ -100,7 +101,6 @@ public class TestResultSetLoaderLimits extends
SubOperatorTest {
/**
* Verify that the caller can set a row limit lower than the default.
*/
-
@Test
public void testCustomRowLimit() {
@@ -183,7 +183,6 @@ public class TestResultSetLoaderLimits extends
SubOperatorTest {
/**
* Test that the row limit can change between batches.
*/
-
@Test
public void testDynamicLimit() {
@@ -224,6 +223,108 @@ public class TestResultSetLoaderLimits extends
SubOperatorTest {
assertEquals(count, rootWriter.rowCount());
rsLoader.harvest().clear();
+ // Test limits
+ rsLoader.setTargetRowCount(-3);
+ assertEquals(1, rsLoader.targetRowCount());
+ rsLoader.setTargetRowCount(Integer.MAX_VALUE);
+ assertEquals(ValueVector.MAX_ROW_COUNT, rsLoader.targetRowCount());
+
+ rsLoader.close();
+ }
+
+ /**
+ * Limit 0 is used to obtain only the schema.
+ */
+ @Test
+ public void testLimit0() {
+ ResultSetOptions options = new ResultSetOptionBuilder()
+ .limit(0)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(),
options);
+
+ // Can define a schema-only batch.
+ assertTrue(rsLoader.startBatch());
+
+ RowSetLoader rootWriter = rsLoader.writer();
+ rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR,
DataMode.REQUIRED));
+
+ // But, can't add any rows.
+ assertTrue(rootWriter.isFull());
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(0, result.rowCount());
+ assertTrue(rsLoader.atLimit());
+ TupleMetadata schema = new SchemaBuilder()
+ .add("s", MinorType.VARCHAR)
+ .buildSchema();
+ assertTrue(schema.equals(result.schema()));
+ result.clear();
+
+ // Can't start a data batch.
+ assertFalse(rsLoader.startBatch());
+
+ // Can't start a row.
+ assertFalse(rootWriter.start());
+
+ rsLoader.close();
+ }
+
+ /**
+ * Pathological limit case: a single row.
+ */
+ @Test
+ public void testLimit1() {
+
+ // Start with a small limit.
+
+ ResultSetOptions options = new ResultSetOptionBuilder()
+ .limit(1)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(),
options);
+
+ assertTrue(rsLoader.startBatch());
+ assertEquals(1, rsLoader.maxBatchSize());
+ RowSetLoader rootWriter = rsLoader.writer();
+ rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR,
DataMode.REQUIRED));
+ rootWriter.addRow("foo");
+ assertTrue(rootWriter.isFull());
+ assertFalse(rootWriter.start());
+ RowSet result = fixture.wrap(rsLoader.harvest());
+ assertEquals(1, result.rowCount());
+ result.clear();
+ assertTrue(rsLoader.atLimit());
+ rsLoader.close();
+ }
+
+ /**
+ * Test filling one batch normally, then hitting the scan limit on the
second.
+ */
+ @Test
+ public void testLimit100() {
+ ResultSetOptions options = new ResultSetOptionBuilder()
+ .rowCountLimit(75)
+ .limit(100)
+ .build();
+ ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(),
options);
+
+ RowSetLoader rootWriter = rsLoader.writer();
+ rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR,
DataMode.REQUIRED));
+
+ rsLoader.startBatch();
+ int count = fillToLimit(rootWriter);
+ assertEquals(75, count);
+ assertEquals(count, rootWriter.rowCount());
+ rsLoader.harvest().clear();
+ assertFalse(rsLoader.atLimit());
+
+ // Second batch will hit the limit
+
+ rsLoader.startBatch();
+ count = fillToLimit(rootWriter);
+ assertEquals(25, count);
+ assertEquals(count, rootWriter.rowCount());
+ rsLoader.harvest().clear();
+ assertTrue(rsLoader.atLimit());
+
rsLoader.close();
}
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
index 847c953..c68bdae 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
@@ -77,7 +77,6 @@ import org.junit.experimental.categories.Category;
* the structure. The object tree will show all the components and their
* current state.
*/
-
@Category(RowSetTests.class)
public class TestResultSetLoaderProtocol extends SubOperatorTest {
@@ -317,7 +316,6 @@ public class TestResultSetLoaderProtocol extends
SubOperatorTest {
* additional information. The code here simply uses the
<tt>MaterializedField</tt>
* to create a <tt>ColumnMetadata</tt> implicitly.
*/
-
@Test
public void testCaseInsensitiveSchema() {
ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
@@ -573,7 +571,6 @@ public class TestResultSetLoaderProtocol extends
SubOperatorTest {
* Also verifies the test-time method to set a row of values using
* a single method.
*/
-
@Test
public void testInitialSchema() {
TupleMetadata schema = new SchemaBuilder()
@@ -606,7 +603,7 @@ public class TestResultSetLoaderProtocol extends
SubOperatorTest {
/**
* The writer protocol allows a client to write to a row any number of times
- * before invoking <tt>save()</tt>. In this case, each new value simply
+ * before invoking {@code save()}. In this case, each new value simply
* overwrites the previous value. Here, we test the most basic case: a
simple,
* flat tuple with no arrays. We use a very large Varchar that would, if
* overwrite were not working, cause vector overflow.
@@ -628,7 +625,6 @@ public class TestResultSetLoaderProtocol extends
SubOperatorTest {
* Note that there is no explicit method to discard a row. Instead,
* the rule is that a row is not saved until <tt>save()</tt> is called.
*/
-
@Test
public void testOverwriteRow() {
TupleMetadata schema = new SchemaBuilder()
@@ -685,7 +681,6 @@ public class TestResultSetLoaderProtocol extends
SubOperatorTest {
* Test that memory is released if the loader is closed with an active
* batch (that is, before the batch is harvested.)
*/
-
@Test
public void testCloseWithoutHarvest() {
TupleMetadata schema = new SchemaBuilder()
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
index d3b2f85..f137a24 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
@@ -39,7 +39,6 @@ import
org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
* array. Instead, the order of calls, or selectively making or not making
* calls, can change.
*/
-
public interface WriterEvents extends WriterPosition {
/**
@@ -47,7 +46,6 @@ public interface WriterEvents extends WriterPosition {
* operations to newly-added columns to synchronize them with the rest
* of the writers.
*/
-
enum State {
/**
@@ -57,20 +55,18 @@ public interface WriterEvents extends WriterPosition {
IDLE,
/**
- * <tt>startWrite()</tt> has been called to start a write operation
- * (start a batch), but <tt>startValue()</tt> has not yet been called
- * to start a row (or value within an array). <tt>startWrite()</tt> must
+ * {@code startWrite()} has been called to start a write operation
+ * (start a batch), but {@code startValue()} has not yet been called
+ * to start a row (or value within an array). {@code startWrite()} must
* be called on newly added columns.
*/
-
IN_WRITE,
/**
- * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on
+ * Both {@code startWrite()} and {@code startValue()} has been called on
* the tuple to prepare for writing values, and both must be called on
* newly-added vectors.
*/
-
IN_ROW
}
@@ -80,7 +76,6 @@ public interface WriterEvents extends WriterPosition {
* listener is bound, and a vector overflows, then an exception is
* thrown.
*/
-
interface ColumnWriterListener {
/**
@@ -90,7 +85,6 @@ public interface WriterEvents extends WriterPosition {
*
* @param writer the writer that triggered the overflow
*/
-
void overflowed(ScalarWriter writer);
/**
@@ -102,9 +96,8 @@ public interface WriterEvents extends WriterPosition {
* @param delta the amount by which the vector is to grow
* @return true if the vector can be grown, false if the writer
* should instead trigger an overflow by calling
- * <tt>overflowed()</tt>
+ * {@code overflowed()}
*/
-
boolean canExpand(ScalarWriter writer, int delta);
}
@@ -114,7 +107,6 @@ public interface WriterEvents extends WriterPosition {
* @param index the writer index (top level or nested for
* arrays)
*/
-
void bindIndex(ColumnWriterIndex index);
/**
@@ -127,21 +119,18 @@ public interface WriterEvents extends WriterPosition {
* @param listener
* the vector event listener to bind
*/
-
void bindListener(ColumnWriterListener listener);
/**
* Start a write (batch) operation. Performs any vector initialization
* required at the start of a batch (especially for offset vectors.)
*/
-
void startWrite();
/**
* Start a new row. To be called only when a row is not active. To
* restart a row, call {@link #restartRow()} instead.
*/
-
void startRow();
/**
@@ -152,7 +141,6 @@ public interface WriterEvents extends WriterPosition {
* offset vector based on the cumulative value saves is done when
* saving the row.
*/
-
void endArrayValue();
/**
@@ -161,27 +149,23 @@ public interface WriterEvents extends WriterPosition {
* Done when abandoning the current row, such as when filtering out
* a row at read time.
*/
-
void restartRow();
/**
* Saves a row. Commits offset vector locations and advances each to
* the next position. Can be called only when a row is active.
*/
-
void saveRow();
/**
* End a batch: finalize any vector values.
*/
-
void endWrite();
/**
* The vectors backing this vector are about to roll over. Finish
* the current batch up to, but not including, the current row.
*/
-
void preRollover();
/**
@@ -191,7 +175,6 @@ public interface WriterEvents extends WriterPosition {
* for the current row now resides at the start of a new vector instead
* of its previous location elsewhere in an old vector.
*/
-
void postRollover();
abstract void dump(HierarchicalFormatter format);