This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new efb231d  DRILL-8115: Adds LIMIT pushdown support in the scan framework 
(#2441)
efb231d is described below

commit efb231ddb5a1f09e7a4431cab48e6328292d0e3f
Author: Paul Rogers <[email protected]>
AuthorDate: Tue Feb 1 01:54:24 2022 -0800

    DRILL-8115: Adds LIMIT pushdown support in the scan framework (#2441)
    
    * Adds LIMIT pushdown support in the scan framework
    
    Modifies EVF and the scan framework to enforce LIMIT
    pushdown across readers.
    
    * Fix build issues
    
    * Fixes from review comments
---
 .../exec/physical/impl/scan/RowBatchReader.java    |  62 ++--
 .../physical/impl/scan/ScanOperatorEvents.java     |   4 -
 .../exec/physical/impl/scan/ScanOperatorExec.java  |  24 +-
 .../impl/scan/framework/ManagedScanFramework.java  |   3 +
 .../impl/scan/framework/SchemaNegotiator.java      |   6 +
 .../impl/scan/framework/SchemaNegotiatorImpl.java  |   6 +
 .../impl/scan/framework/ShimBatchReader.java       |  27 +-
 .../scan/project/ReaderSchemaOrchestrator.java     |  27 +-
 .../impl/scan/project/ScanSchemaOrchestrator.java  |  29 +-
 .../impl/scan/v3/ScanLifecycleBuilder.java         |  17 ++
 .../impl/scan/v3/lifecycle/ReaderLifecycle.java    |  39 ++-
 .../impl/scan/v3/lifecycle/ScanLifecycle.java      |  18 +-
 .../scan/v3/lifecycle/SchemaNegotiatorImpl.java    |   5 +
 .../exec/physical/resultSet/ResultSetLoader.java   |  23 +-
 .../exec/physical/resultSet/RowSetLoader.java      |   1 +
 .../resultSet/impl/ResultSetLoaderImpl.java        |  72 ++++-
 .../resultSet/impl/ResultSetOptionBuilder.java     |  10 +
 .../physical/resultSet/impl/RowSetLoaderImpl.java  |   3 +
 .../physical/resultSet/impl/WriterIndexImpl.java   |   2 +-
 .../exec/store/mock/ExtendedMockBatchReader.java   |  13 +-
 .../impl/scan/BaseScanOperatorExecTest.java        |   7 +-
 .../physical/impl/scan/TestScanOperExecBasics.java |   1 -
 .../impl/scan/TestScanOperExecEarlySchema.java     |   1 -
 .../physical/impl/scan/TestScanOperExecLimit.java  | 270 +++++++++++++++++
 .../impl/scan/TestScanOperExecOverflow.java        |   3 -
 .../exec/physical/impl/scan/v3/TestScanLimit.java  | 277 ++++++++++++++++++
 .../scan/v3/lifecycle/BaseTestScanLifecycle.java   |   2 +
 .../scan/v3/lifecycle/TestScanLifecycleLimit.java  | 321 +++++++++++++++++++++
 .../resultSet/impl/TestResultSetLoaderLimits.java  | 109 ++++++-
 .../impl/TestResultSetLoaderProtocol.java          |   7 +-
 .../exec/vector/accessor/writer/WriterEvents.java  |  27 +-
 31 files changed, 1273 insertions(+), 143 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
index 61de584..b52b116 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/RowBatchReader.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.record.VectorContainer;
  * vectors across batches, etc.
  * <p>
  * Note that this interface reads a <b>batch</b> of rows, not
- * a single row. (The original <tt>RecordReader</tt> could be
+ * a single row. (The original {@code RecordReader} could be
  * confusing in this aspect.)
  * <p>
  * The expected lifecycle is:
@@ -49,15 +49,15 @@ import org.apache.drill.exec.record.VectorContainer;
  * is late-schema), return null for the output and -1 for the schema
  * version. Else, return non-negative for the version number and
  * an empty batch with a schema.</li>
- * <li>{@link #next()}</tt> to retrieve the next record batch.
+ * <li>{@link #next()}} to retrieve the next record batch.
  * Return true if a batch is available, false if EOF. There is no
- * requirement to return a batch; the first call to <tt>next()</tt>
- * can return <tt>false</tt> if no data is available.</tt>
+ * requirement to return a batch; the first call to {@code next()}
+ * can return {@code false} if no data is available.}
  * <li>{@link #output()} and {@link #schemaVersion()} to obtain the
  * batch of records read, and to detect if the version of the schema
  * is different from the previous batch.</li>
  * <li>{@link #close()} when the reader is no longer needed. This
- * may occur before <tt>next()</tt> returns <tt>false</tt> if an
+ * may occur before {@code next()} returns {@code false} if an
  * error occurs or a limit is reached.</li>
  * </ul>
  * Although the reader should not care, the scanner must return an
@@ -65,8 +65,8 @@ import org.apache.drill.exec.record.VectorContainer;
  * The scan operator handles this transparently:
  * <ul>
  * <li>If the reader is early-schema, then the reader itself returns
- * an empty batch (via <tt>output()</tt), with only schema after the call
- * to <tt>open()</tt>. The scanner sends this empty batch downstream
+ * an empty batch (via {@code output()</tt), with only schema after the call
+ * to {@code open()}. The scanner sends this empty batch downstream
  * directly.</li>
  * <li>If the reader is late-schema, then the reader will read the first
  * batch from the reader. It will save that batch, extract the schema,
@@ -81,7 +81,6 @@ import org.apache.drill.exec.record.VectorContainer;
  * from any method. A {@link UserException} is preferred to provide
  * detailed information about the source of the problem.
  */
-
 public interface RowBatchReader {
 
   /**
@@ -89,12 +88,11 @@ public interface RowBatchReader {
    *
    * @return display name for errors
    */
-
   String name();
 
   /**
    * Setup the record reader. Called just before the first call
-   * to <tt>next()</tt>. Allocate resources here, not in the constructor.
+   * to {@code next()}. Allocate resources here, not in the constructor.
    * Example: open files, allocate buffers, etc.
    *
    * @return true if the reader is open and ready to read (possibly no)
@@ -102,10 +100,9 @@ public interface RowBatchReader {
    * but the scanner should not fail, it should move onto another reader
    *
    * @throws RuntimeException for "hard" errors that should terminate
-   * the query. <tt>UserException</tt> preferred to explain the problem
+   * the query. {@code UserException} preferred to explain the problem
    * better than the scan operator can by guessing at the cause
    */
-
   boolean open();
 
   /**
@@ -121,17 +118,16 @@ public interface RowBatchReader {
    * @return true if this reader can (and has) defined an empty batch
    * to describe the schema, false otherwise
    */
-
   boolean defineSchema();
 
   /**
    * Read the next batch. Reading continues until either EOF,
    * or until the mutator indicates that the batch is full.
    * The batch is considered valid if it is non-empty. Returning
-   * <tt>true</tt> with an empty batch is valid, and is helpful on
+   * {@code true} with an empty batch is valid, and is helpful on
    * the very first batch (returning schema only.) An empty batch
-   * with a <tt>false</tt> return code indicates EOF and the batch
-   * will be discarded. A non-empty batch along with a <tt>false</tt>
+   * with a {@code false} return code indicates EOF and the batch
+   * will be discarded. A non-empty batch along with a {@code false}
    * return result indicates a final, valid batch, but that EOF was
    * reached and no more data is available.
    * <p>
@@ -139,14 +135,13 @@ public interface RowBatchReader {
    * final batch just to find out that no more data is available;
    * it allows EOF to be returned along with the final batch.
    *
-   * @return <tt>true</tt> if more data may be available (and so
-   * <tt>next()</tt> should be called again, <tt>false</tt> to indicate
+   * @return {@code true} if more data may be available (and so
+   * {@code next()} should be called again, {@code false} to indicate
    * that EOF was reached
    *
-   * @throws RuntimeException (<tt>UserException</tt> preferred) if an
+   * @throws RuntimeException ({@code UserException} preferred) if an
    * error occurs that should fail the query.
    */
-
   boolean next();
 
   /**
@@ -158,7 +153,7 @@ public interface RowBatchReader {
    * empty batch with the schema set. The scanner will return this schema
    * downstream to inform other operators of the schema.</li>
    * <li>Directly after a successful call to {@link next()} to retrieve
-   * the batch produced by that call. (No call is made if <tt>next()</tt>
+   * the batch produced by that call. (No call is made if {@code next()}
    * returns false.</li>
    * </ul>
    * Note that most operators require the same vectors be present in
@@ -166,10 +161,9 @@ public interface RowBatchReader {
    * container, and same set of vectors, on each call.
    *
    * @return a vector container, with the record count and schema
-   * set, that announces the schema after <tt>open()</tt> (optional)
-   * or returns rows read after <tt>next()</tt> (required)
+   * set, that announces the schema after {@code open()} (optional)
+   * or returns rows read after {@code next()} (required)
    */
-
   VectorContainer output();
 
   /**
@@ -183,8 +177,8 @@ public interface RowBatchReader {
    * <li>The number increases if a batch has a different schema than the
    * previous batch.</li>
    * </ul>
-   * Numbers increment (or not) on calls to <tt>next()</tt>. Thus Two 
successive
-   * calls to this method should return the same number if no <tt>next()</tt>
+   * Numbers increment (or not) on calls to {@code next()}. Thus Two successive
+   * calls to this method should return the same number if no {@code next()}
    * call lies between.
    * <p>
    * If the reader can return a schema on open (so-called "early-schema), then
@@ -192,29 +186,27 @@ public interface RowBatchReader {
    * happens to be empty (such as reading an empty file.)
    * <p>
    * However, if the reader cannot return a schema on open (so-called "late
-   * schema"), then this method must return -1 (and <tt>output()</tt> must
+   * schema"), then this method must return -1 (and {@code output()} must
    * return null) to indicate now schema is available when called before the
-   * first call to <tt>next()</tt>.
+   * first call to {@code next()}.
    * <p>
-   * No calls will be made to this method before <tt>open()</tt> after
-   * <tt>close()<tt> or after <tt>next()</tt> returns false. The implementation
+   * No calls will be made to this method before {@code open()} after
+   * {@code close(){@code  or after {@code next()} returns false. The 
implementation
    * is thus not required to handle these cases.
    *
    * @return the schema version, or -1 if no schema version is yet available
    */
-
   int schemaVersion();
 
   /**
    * Release resources. Called just after a failure, when the scanner
-   * is cancelled, or after <tt>next()</tt> returns EOF. Release
+   * is cancelled, or after {@code next()} returns EOF. Release
    * all resources and close files. Guaranteed to be called if
-   * <tt>open()</tt> returns normally; will not be called if <tt>open()</tt>
+   * {@code open()} returns normally; will not be called if {@code open()}
    * throws an exception.
    *
-   * @throws RutimeException (<tt>UserException</tt> preferred) if an
+   * @throws RutimeException ({@code UserException} preferred) if an
    * error occurs that should fail the query.
    */
-
   void close();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
index 04b2c7e..9aceb32 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorEvents.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.ops.OperatorContext;
  * across scanners. See {@link ScanSchemaOrchestrator} for the managed
  * framework.
  */
-
 public interface ScanOperatorEvents {
 
   /**
@@ -51,7 +50,6 @@ public interface ScanOperatorEvents {
    *
    * @param context the operator context for the scan operator
    */
-
   void bind(OperatorContext context);
 
   /**
@@ -67,13 +65,11 @@ public interface ScanOperatorEvents {
    * @return a batch reader for one of the scan elements within the
    * scan physical plan for this scan operator
    */
-
   RowBatchReader nextReader();
 
   /**
    * Called when the scan operator itself is closed. Indicates that no more
    * readers are available.
    */
-
   void close();
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
index 699eb1f..e1038d5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
@@ -32,6 +32,24 @@ import org.slf4j.LoggerFactory;
  * Implementation of the revised scan operator that uses a mutator aware of
  * batch sizes. This is the successor to {@link ScanBatch} and should be used
  * by all new scan implementations.
+ * <p>
+ * The basic concept is to split the scan operator into layers:
+ * <ul>
+ * <li>The {@code OperatorRecordBatch} which implements Drill's Volcano-like
+ * protocol.</li>
+ * <li>The scan operator "wrapper" (this class) which implements actions for 
the
+ * operator record batch specifically for scan. It iterates over readers,
+ * delegating semantic work to other classes.</li>
+ * <li>The implementation of per-reader semantics in the two EVF versions and
+ * other ad-hoc implementations.</li>
+ * <li>The result set loader and related classes which pack values into
+ * value vectors.</li>
+ * <li>Value vectors, which store the data.</li>
+ * </ul>
+ * <p>
+ * The layered format can be confusing. However, each layer is somewhat
+ * complex, so dividing the work into layers keeps the overall complexity
+ * somewhat under control.
  *
  * <h4>Scanner Framework</h4>
  *
@@ -116,13 +134,11 @@ public class ScanOperatorExec implements OperatorExec {
      * The scan has been started, but next() has not yet been
      * called.
      */
-
     START,
 
     /**
      * A reader is active and has more batches to deliver.
      */
-
     READER,
 
     /**
@@ -131,14 +147,13 @@ public class ScanOperatorExec implements OperatorExec {
      * was returned from next(). The next call to next() will
      * be the last.
      */
-
     EMPTY,
 
     /**
      * All readers are complete; no more batches to deliver.
+     * Or, hit the limit pushed down to the scan.
      * close() is not yet called.
      */
-
     END,
 
     /**
@@ -146,7 +161,6 @@ public class ScanOperatorExec implements OperatorExec {
      * states. No further calls to next() allowed. Waiting
      * for the call to close().
      */
-
     FAILED,
 
     /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
index 860912c..dc456db 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java
@@ -193,6 +193,9 @@ public class ManagedScanFramework implements 
ScanOperatorEvents {
 
   @Override
   public RowBatchReader nextReader() {
+    if (scanOrchestrator.atLimit()) {
+      return null;
+    }
     ManagedReader<? extends SchemaNegotiator> reader = readerFactory.next();
     return reader == null ? null : new ShimBatchReader(this, reader);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
index b2a793d..0a112b5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiator.java
@@ -165,6 +165,12 @@ public interface SchemaNegotiator {
   void batchSize(int maxRecordsPerBatch);
 
   /**
+   * Push down a LIMIT into the scan. This is a per-reader limit,
+   * not an overall scan limit.
+   */
+  void limit(long limit);
+
+  /**
    * Build the schema, plan the required projections and static
    * columns and return a loader used to populate value vectors.
    * If the select list includes a subset of table columns, then
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
index 653df53..5139ca7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/SchemaNegotiatorImpl.java
@@ -64,6 +64,7 @@ public class SchemaNegotiatorImpl implements SchemaNegotiator 
{
   protected TupleMetadata tableSchema;
   protected boolean isSchemaComplete;
   protected int batchSize = ValueVector.MAX_ROW_COUNT;
+  protected long limit = -1;
 
   public SchemaNegotiatorImpl(ManagedScanFramework framework) {
     this.framework = framework;
@@ -138,6 +139,11 @@ public class SchemaNegotiatorImpl implements 
SchemaNegotiator {
     return framework.builder.userName;
   }
 
+  @Override
+  public void limit(long limit) {
+    this.limit = limit;
+  }
+
   /**
    * Callback from the schema negotiator to build the schema from information 
from
    * both the table and scan operator. Returns the result set loader to be used
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
index 3d750c6..487950c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ShimBatchReader.java
@@ -54,7 +54,8 @@ public class ShimBatchReader implements RowBatchReader, 
NegotiatorListener {
    */
   private boolean eof;
 
-  public ShimBatchReader(ManagedScanFramework manager, ManagedReader<? extends 
SchemaNegotiator> reader) {
+  public ShimBatchReader(ManagedScanFramework manager,
+      ManagedReader<? extends SchemaNegotiator> reader) {
     this.framework = manager;
     this.reader = reader;
     readerOrchestrator = manager.scanOrchestrator().startReader();
@@ -72,7 +73,7 @@ public class ShimBatchReader implements RowBatchReader, 
NegotiatorListener {
 
     // Build and return the result set loader to be used by the reader.
 
-    if (! framework.open(this)) {
+    if (!framework.open(this)) {
 
       // If we had a soft failure, then there should be no schema.
       // The reader should not have negotiated one. Not a huge
@@ -113,13 +114,21 @@ public class ShimBatchReader implements RowBatchReader, 
NegotiatorListener {
     // The reader may report EOF, but the result set loader might
     // have a lookahead row.
 
-    if (eof && ! tableLoader.hasRows()) {
+    if (eof && !tableLoader.hasRows()) {
+      return false;
+    }
+
+    // Hit the per-reader limit?
+    if (tableLoader.atLimit()) {
       return false;
     }
 
     // Prepare for the batch.
 
-    readerOrchestrator.startBatch();
+    if (!readerOrchestrator.startBatch()) {
+      eof = true;
+      return false;
+    }
 
     // Read the batch. The reader should report EOF if it hits the
     // end of data, even if the reader returns rows. This will prevent 
allocating
@@ -127,8 +136,8 @@ public class ShimBatchReader implements RowBatchReader, 
NegotiatorListener {
     // already reported EOF. In that case, we're just processing any last
     // lookahead row in the result set loader.
 
-    if (! eof) {
-      eof = ! reader.next();
+    if (!eof) {
+      eof = !reader.next();
     }
 
     // Add implicit columns, if any.
@@ -136,13 +145,13 @@ public class ShimBatchReader implements RowBatchReader, 
NegotiatorListener {
     // Having a correct row count, even if 0, is important to
     // the scan operator.
 
-    readerOrchestrator.endBatch(eof);
+    eof = readerOrchestrator.endBatch(eof);
 
     // Return EOF (false) only when the reader reports EOF
     // and the result set loader has drained its rows from either
     // this batch or lookahead rows.
 
-    return ! eof || tableLoader.hasRows();
+    return !eof || tableLoader.hasRows();
   }
 
   @Override
@@ -197,7 +206,7 @@ public class ShimBatchReader implements RowBatchReader, 
NegotiatorListener {
     this.schemaNegotiator = schemaNegotiator;
     readerOrchestrator.setBatchSize(schemaNegotiator.batchSize);
     tableLoader = 
readerOrchestrator.makeTableLoader(schemaNegotiator.errorContext(),
-        schemaNegotiator.tableSchema);
+        schemaNegotiator.tableSchema, schemaNegotiator.limit);
     return tableLoader;
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
index 56fb481..6c9d3fe 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ReaderSchemaOrchestrator.java
@@ -42,6 +42,7 @@ import java.util.List;
 public class ReaderSchemaOrchestrator implements VectorSource {
 
   private final ScanSchemaOrchestrator scanOrchestrator;
+  private long limit;
   private int readerBatchSize;
   private ResultSetLoaderImpl tableLoader;
   private int prevTableSchemaVersion = -1;
@@ -55,8 +56,10 @@ public class ReaderSchemaOrchestrator implements 
VectorSource {
   private ResolvedRow rootTuple;
   private VectorContainer tableContainer;
 
-  public ReaderSchemaOrchestrator(ScanSchemaOrchestrator 
scanSchemaOrchestrator) {
+  public ReaderSchemaOrchestrator(
+      ScanSchemaOrchestrator scanSchemaOrchestrator, long limit) {
     scanOrchestrator = scanSchemaOrchestrator;
+    this.limit = limit;
     readerBatchSize = scanOrchestrator.options.scanBatchRecordLimit;
   }
 
@@ -68,10 +71,11 @@ public class ReaderSchemaOrchestrator implements 
VectorSource {
 
   @VisibleForTesting
   public ResultSetLoader makeTableLoader(TupleMetadata readerSchema) {
-    return makeTableLoader(scanOrchestrator.scanProj.context(), readerSchema);
+    return makeTableLoader(scanOrchestrator.scanProj.context(), readerSchema, 
-1);
   }
 
-  public ResultSetLoader makeTableLoader(CustomErrorContext errorContext, 
TupleMetadata readerSchema) {
+  public ResultSetLoader makeTableLoader(CustomErrorContext errorContext,
+      TupleMetadata readerSchema, long localLimit) {
     ResultSetOptionBuilder options = new ResultSetOptionBuilder();
     options.rowCountLimit(Math.min(readerBatchSize, 
scanOrchestrator.options.scanBatchRecordLimit));
     options.vectorCache(scanOrchestrator.vectorCache);
@@ -86,6 +90,12 @@ public class ReaderSchemaOrchestrator implements 
VectorSource {
     // adds a column later.
     options.projectionFilter(scanOrchestrator.scanProj.readerProjection);
     options.readerSchema(readerSchema);
+    if (limit < 0) {
+      limit = localLimit;
+    } else if (localLimit >= 0) {
+      limit = Math.min(localLimit, limit);
+    }
+    options.limit(limit);
 
     // Create the table loader
     tableLoader = new ResultSetLoaderImpl(scanOrchestrator.allocator, 
options.build());
@@ -101,8 +111,8 @@ public class ReaderSchemaOrchestrator implements 
VectorSource {
     endBatch();
   }
 
-  public void startBatch() {
-    tableLoader.startBatch();
+  public boolean startBatch() {
+    return tableLoader.startBatch();
   }
 
   /**
@@ -121,7 +131,7 @@ public class ReaderSchemaOrchestrator implements 
VectorSource {
    *
    * @param eof is end of file
    */
-  public void endBatch(boolean eof) {
+  public boolean endBatch(boolean eof) {
 
     // Get the batch results in a container.
     tableContainer = tableLoader.harvest();
@@ -140,7 +150,10 @@ public class ReaderSchemaOrchestrator implements 
VectorSource {
     if (projected) {
       projectMetadata(false);
     }
-    rootTuple.setRowCount(tableContainer.getRecordCount());
+    int rowCount = tableContainer.getRecordCount();
+    rootTuple.setRowCount(rowCount);
+    scanOrchestrator.tallyBatch(rowCount);
+    return eof || tableLoader.atLimit();
   }
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
index b1b3398..4f76132 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ScanSchemaOrchestrator.java
@@ -208,6 +208,11 @@ public class ScanSchemaOrchestrator {
     private CustomErrorContext errorContext;
 
     /**
+     * Pushed-down scan LIMIT.
+     */
+    private long limit = -1;
+
+    /**
      * Specify an optional metadata manager. Metadata is a set of constant
      * columns with per-reader values. For file-based sources, this is usually
      * the implicit and partition columns; but it could be other items for 
other
@@ -291,6 +296,12 @@ public class ScanSchemaOrchestrator {
       return providedSchema;
     }
 
+    public void limit(long limit) {
+      this.limit = Math.max(-1, limit);
+    }
+
+    public long limit() { return limit; }
+
     public void errorContext(CustomErrorContext context) {
       this.errorContext = context;
     }
@@ -301,8 +312,7 @@ public class ScanSchemaOrchestrator {
 
     @VisibleForTesting
     public ScanOperatorExec buildScan() {
-      return new ScanOperatorExec(buildEvents(),
-          ! disableEmptyResults);
+      return new ScanOperatorExec(buildEvents(), !disableEmptyResults);
     }
 
     public OperatorRecordBatch buildScanOperator(FragmentContext fragContext, 
PhysicalOperator pop) {
@@ -335,6 +345,7 @@ public class ScanSchemaOrchestrator {
     public final boolean useSchemaSmoothing;
     public final boolean allowRequiredNullColumns;
     public final TupleMetadata providedSchema;
+    public final long limit;
 
     /**
      * Context for error messages.
@@ -352,6 +363,7 @@ public class ScanSchemaOrchestrator {
       context = builder.errorContext;
       providedSchema = builder.providedSchema;
       allowRequiredNullColumns = builder.allowRequiredNullColumns;
+      limit = builder.limit < 0 ? Long.MAX_VALUE : builder.limit;
     }
 
     protected TupleMetadata providedSchema() {
@@ -383,6 +395,8 @@ public class ScanSchemaOrchestrator {
   protected final ScanLevelProjection scanProj;
   private ReaderSchemaOrchestrator currentReader;
   protected final SchemaSmoother schemaSmoother;
+  protected int batchCount;
+  protected long rowCount;
 
   // Output
 
@@ -435,7 +449,7 @@ public class ScanSchemaOrchestrator {
 
   public ReaderSchemaOrchestrator startReader() {
     closeReader();
-    currentReader = new ReaderSchemaOrchestrator(this);
+    currentReader = new ReaderSchemaOrchestrator(this, options.limit - 
rowCount);
     return currentReader;
   }
 
@@ -458,6 +472,15 @@ public class ScanSchemaOrchestrator {
     return outputContainer;
   }
 
+  public void tallyBatch(int rowCount) {
+    this.batchCount++;
+    this.rowCount += rowCount;
+  }
+
+  public boolean atLimit() {
+    return batchCount > 0 && rowCount >= options.limit;
+  }
+
   public void closeReader() {
     if (currentReader != null) {
       currentReader.close();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
index 393fc23..c3bd1f6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/ScanLifecycleBuilder.java
@@ -153,6 +153,11 @@ public class ScanLifecycleBuilder {
    */
   protected CustomErrorContext errorContext;
 
+  /**
+   * Pushed-down scan LIMIT.
+   */
+  private long limit = Long.MAX_VALUE;
+
   public void options(OptionSet options) {
     this.options = options;
   }
@@ -288,6 +293,18 @@ public class ScanLifecycleBuilder {
 
   public SchemaValidator schemaValidator() { return schemaValidator; }
 
+  public void limit(long limit) {
+    // Operator definitions use -1 for "no limit", this mechanism
+    // uses a very big number for "no limit."
+    if (limit < 0) {
+      this.limit = Long.MAX_VALUE;
+    } else {
+      this.limit = limit;
+    }
+  }
+
+  public long limit() { return limit; }
+
   public ScanLifecycle build(OperatorContext context) {
     return new ScanLifecycle(context, this);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
index f322f51..4cb5ad8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ReaderLifecycle.java
@@ -94,9 +94,10 @@ import org.slf4j.LoggerFactory;
 public class ReaderLifecycle implements RowBatchReader {
   private static final Logger logger = 
LoggerFactory.getLogger(ReaderLifecycle.class);
 
-  private enum State { START, DATA, FINAL, EOF }
+  private enum State { START, DATA, FINAL, LIMIT, EOF }
 
   private final ScanLifecycle scanLifecycle;
+  private final long limit;
   protected final TupleMetadata readerInputSchema;
   private ManagedReader reader;
   private final SchemaNegotiatorImpl schemaNegotiator;
@@ -107,8 +108,9 @@ public class ReaderLifecycle implements RowBatchReader {
   private OutputBatchBuilder outputBuilder;
   private State state = State.START;
 
-  public ReaderLifecycle(ScanLifecycle scanLifecycle) {
+  public ReaderLifecycle(ScanLifecycle scanLifecycle, long limit) {
     this.scanLifecycle = scanLifecycle;
+    this.limit = limit;
     this.readerInputSchema = schemaTracker().readerInputSchema();
     this.schemaNegotiator = scanLifecycle.newNegotiator(this);
   }
@@ -127,6 +129,9 @@ public class ReaderLifecycle implements RowBatchReader {
 
   @Override
   public String name() {
+    if (reader == null) {
+      return getClass().getSimpleName();
+    }
     return reader.getClass().getSimpleName();
   }
 
@@ -164,13 +169,15 @@ public class ReaderLifecycle implements RowBatchReader {
 
   public ResultSetLoader buildLoader() {
     Preconditions.checkState(state == State.START);
+    ScanLifecycleBuilder scanOptions = scanOptions();
     ResultSetOptionBuilder options = new ResultSetOptionBuilder()
-        .rowCountLimit(Math.min(schemaNegotiator.batchSize, 
scanOptions().scanBatchRecordLimit()))
+        .rowCountLimit(Math.min(schemaNegotiator.batchSize, 
scanOptions.scanBatchRecordLimit()))
         .vectorCache(scanLifecycle.vectorCache())
-        .batchSizeLimit(scanOptions().scanBatchByteLimit())
+        .batchSizeLimit(scanOptions.scanBatchByteLimit())
         .errorContext(errorContext())
         .projectionFilter(schemaTracker().projectionFilter(errorContext()))
-        .readerSchema(schemaNegotiator.readerSchema);
+        .readerSchema(schemaNegotiator.readerSchema)
+        .limit(limit);
 
     // Resolve the scan schema if possible.
     applyEarlySchema();
@@ -214,8 +221,15 @@ public class ReaderLifecycle implements RowBatchReader {
 
     // The reader may report EOF, but the result set loader might
     // have a lookahead row.
-    if (state == State.EOF) {
+    switch (state) {
+    case EOF:
       return false;
+    case LIMIT:
+      outputBuilder = null;
+      state = State.EOF;
+      return false;
+    default:
+      break;
     }
 
     // Prepare for the batch.
@@ -226,10 +240,16 @@ public class ReaderLifecycle implements RowBatchReader {
     // a new batch just to learn about EOF. Don't read if the reader
     // already reported EOF. In that case, we're just processing any last
     // lookahead row in the result set loader.
+    //
+    // If the scan has hit is pushed-down limit, then the reader might
+    // return EOF, or it might remain blissfully ignorant about why the
+    // batch was full. Double-check the limit here.
     if (state == State.DATA) {
       try {
         if (!reader.next()) {
           state = State.FINAL;
+        } else if (tableLoader.atLimit()) {
+          state = State.LIMIT;
         }
       } catch (UserException e) {
         throw e;
@@ -282,8 +302,8 @@ public class ReaderLifecycle implements RowBatchReader {
     if (tableLoader.batchCount() == 1 || prevTableSchemaVersion < 
tableLoader.schemaVersion()) {
       reviseOutputProjection(tableLoader.outputSchema());
     }
-    buildOutputBatch(readerOutput);
-    scanLifecycle.tallyBatch();
+    int rowCount = buildOutputBatch(readerOutput);
+    scanLifecycle.tallyBatch(rowCount);
   }
 
   /**
@@ -337,7 +357,7 @@ public class ReaderLifecycle implements RowBatchReader {
         .nullType(scanOptions().nullType());
   }
 
-  private void buildOutputBatch(VectorContainer readerContainer) {
+  private int buildOutputBatch(VectorContainer readerContainer) {
 
     // Create the implicit columns loader loader after the first
     // batch so we can report if the file is empty.
@@ -358,6 +378,7 @@ public class ReaderLifecycle implements RowBatchReader {
       createOutputBuilder();
     }
     outputBuilder.load(rowCount);
+    return rowCount;
   }
 
   private void createOutputBuilder() {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
index 3261a05..b77f0cf 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/ScanLifecycle.java
@@ -136,6 +136,7 @@ public class ScanLifecycle {
   private final ScanSchemaTracker schemaTracker;
   private final ReaderFactory<?> readerFactory;
   private int batchCount;
+  private long rowCount;
 
   /**
    * Cache used to preserve the same vectors from one output batch to the
@@ -171,15 +172,24 @@ public class ScanLifecycle {
   public boolean hasOutputSchema() { return schemaTracker.isResolved(); }
   public CustomErrorContext errorContext() { return options.errorContext(); }
   public BufferAllocator allocator() { return context.getAllocator(); }
-  public void tallyBatch() { batchCount++; }
   public int batchCount() { return batchCount; }
+  public long rowCount() { return rowCount; }
+
+  public void tallyBatch(int rowCount) {
+    batchCount++;
+    this.rowCount += rowCount;
+  }
 
   public RowBatchReader nextReader() {
-    if (readerFactory.hasNext()) {
-      return new ReaderLifecycle(this);
-    } else {
+    // Check limit. But, do at least one (zero row) batch
+    // to capture schema.
+    if (batchCount > 0 && rowCount >= options.limit()) {
+      return null;
+    }
+    if (!readerFactory.hasNext()) {
       return null;
     }
+    return new ReaderLifecycle(this, options.limit() - rowCount);
   }
 
   protected SchemaNegotiatorImpl newNegotiator(ReaderLifecycle 
readerLifecycle) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
index 24ad466..535a758 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/SchemaNegotiatorImpl.java
@@ -53,6 +53,11 @@ import org.apache.drill.exec.vector.ValueVector;
  * a vector written by the reader, in the second, it is a null column
  * filled in by the scan projector (assuming, of course, that "c"
  * is nullable or an array.)
+ * <p>
+ * Pushes limits into the result set loader. The caller must
+ * either check the return value from {#code startBatch()}, or
+ * call {@code atLimit()} after {@code harvest()} to detect when the scan
+ * has reached the limit. Treat the limit condition the same as EOF.
  */
 public class SchemaNegotiatorImpl implements SchemaNegotiator {
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
index cf44229..35c24b2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/ResultSetLoader.java
@@ -71,6 +71,12 @@ public interface ResultSetLoader {
   int targetRowCount();
 
   /**
+   * The maximum number of rows for the present batch. Will be the lesser
+   * of the {@link #targetRowCount()) and the overall scan limit remaining.
+   */
+  int maxBatchSize();
+
+  /**
    * The largest vector size produced by this loader (as specified by
    * the value vector limit.)
    *
@@ -94,7 +100,7 @@ public interface ResultSetLoader {
    * current batch.
    * @return total row count
    */
-  int totalRowCount();
+  long totalRowCount();
 
   /**
    * Report whether the loader currently holds rows. If within a batch,
@@ -109,8 +115,11 @@ public interface ResultSetLoader {
   /**
    * Start a new row batch. Valid only when first started, or after the
    * previous batch has been harvested.
+   *
+   * @return {@code true} if another batch can be read, {@code false} if
+   * the reader has reached the given scan limit.
    */
-  void startBatch();
+  boolean startBatch();
 
   /**
    * Writer for the top-level tuple (the entire row). Valid only when
@@ -229,6 +238,16 @@ public interface ResultSetLoader {
   VectorContainer harvest();
 
   /**
+   * After a {@link #harvest()}, call, call this method to determine if
+   * the scan limit has been hit. If so, treat this as the final batch
+   * for the reader, even if more data is available to read.
+   *
+   * @return {@code true} if the scan has reached a set scan row limit,
+   * {@code false} if there is no limit, or more rows can be read.
+   */
+  boolean atLimit();
+
+  /**
    * The schema of the harvested batch. Valid until the start of the
    * next batch.
    *
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
index 462ad2d..f7d29df 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/RowSetLoader.java
@@ -114,6 +114,7 @@ public interface RowSetLoader extends TupleWriter {
    * @param maxRecords Maximum rows to be returned. (From the limit clause of 
the query)
    * @return True if the row count exceeds the maxRecords, false if not.
    */
+  @Deprecated() // use the limit in options instead
   boolean limitReached(int maxRecords);
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index ac10a18..48e0f07 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -52,6 +52,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, 
LoaderInternals {
     protected final ProjectionFilter projectionSet;
     protected final TupleMetadata schema;
     protected final long maxBatchSize;
+    protected final long scanLimit;
 
     /**
      * Context for error messages.
@@ -66,6 +67,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, 
LoaderInternals {
       schema = null;
       maxBatchSize = -1;
       errorContext = null;
+      scanLimit = Long.MAX_VALUE;
     }
 
     public ResultSetOptions(ResultSetOptionBuilder builder) {
@@ -74,6 +76,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, 
LoaderInternals {
       vectorCache = builder.vectorCache;
       schema = builder.readerSchema;
       maxBatchSize = builder.maxBatchSize;
+      scanLimit = builder.scanLimit;
       errorContext = builder.errorContext == null
           ? EmptyErrorContext.INSTANCE : builder.errorContext;
       if (builder.projectionFilter != null) {
@@ -126,7 +129,7 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
     IN_OVERFLOW,
 
     /**
-     * Batch is full due to reaching the row count limit
+     * Batch is full due to reaching the row count or scan limit
      * when saving a row.
      * No more writes allowed until harvesting the current batch.
      */
@@ -162,6 +165,12 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
     LOOK_AHEAD,
 
     /**
+     * The loader has reached the given scan limit. No further batches
+     * can be started.
+     */
+    LIMITED,
+
+    /**
      * Mutator is closed: no more operations are allowed.
      */
     CLOSED
@@ -238,7 +247,7 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
    * Counts the rows included in previously-harvested batches. Does not
    * include the number of rows in the current batch.
    */
-  private int previousRowCount;
+  private long previousRowCount;
 
   /**
    * Number of rows in the harvest batch. If an overflow batch is in effect,
@@ -257,6 +266,13 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
   private int targetRowCount;
 
   /**
+   * The number of rows allowed for the current batch. Is the lesser of the
+   * maximum batch size, the target row count, or the remaining margin on
+   * the scan limit.
+   */
+  private int batchSizeLimit;
+
+  /**
    * Total bytes allocated to the current batch.
    */
   protected int accumulatedBatchSize;
@@ -358,30 +374,30 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
       case HARVESTED:
       case LOOK_AHEAD:
       case START:
+      case LIMITED:
 
         // Batch is published. Use harvest schema.
         return harvestSchemaVersion;
       default:
 
-        // Not really in a position to give a schema
-        // version.
+        // Not really in a position to give a schema version.
         throw new IllegalStateException("Unexpected state: " + state);
     }
   }
 
   @Override
-  public void startBatch() {
-    startBatch(false);
+  public boolean startBatch() {
+    return startBatch(false);
   }
 
   /**
    * Start a batch to report only schema without data.
    */
-  public void startEmptyBatch() {
-    startBatch(true);
+  public boolean startEmptyBatch() {
+    return startBatch(true);
   }
 
-  public void startBatch(boolean schemaOnly) {
+  public boolean startBatch(boolean schemaOnly) {
     switch (state) {
       case HARVESTED:
       case START:
@@ -412,6 +428,9 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
         // the writers.
         break;
 
+      case LIMITED:
+        return false;
+
       default:
         throw new IllegalStateException("Unexpected state: " + state);
     }
@@ -420,7 +439,9 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
     // updates
     harvestSchemaVersion = activeSchemaVersion;
     pendingRowCount = 0;
+    batchSizeLimit = (int) Math.min(targetRowCount, options.scanLimit - 
totalRowCount());
     state = State.ACTIVE;
+    return true;
   }
 
   @Override
@@ -466,6 +487,8 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
         harvestSchemaVersion = activeSchemaVersion;
         rootWriter.startRow();
         break;
+      case LIMITED:
+        throw new IllegalStateException("Attempt to write past the scan 
limit.");
       default:
         throw new IllegalStateException("Unexpected state: " + state);
     }
@@ -523,6 +546,8 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
       case OVERFLOW:
       case FULL_BATCH:
         return true;
+      case LIMITED:
+        return true;
       default:
         return false;
     }
@@ -530,7 +555,8 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
 
   @Override
   public boolean writeable() {
-    return state == State.ACTIVE || state == State.OVERFLOW;
+    return (state == State.ACTIVE || state == State.OVERFLOW) &&
+           !atLimit();
   }
 
   private boolean isBatchActive() {
@@ -560,7 +586,7 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
 
   @Override
   public void setTargetRowCount(int rowCount) {
-    targetRowCount = Math.max(1, rowCount);
+    targetRowCount = Math.min(Math.max(1, rowCount), 
ValueVector.MAX_ROW_COUNT);
   }
 
   @Override
@@ -570,6 +596,9 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
   public int targetVectorSize() { return options.vectorSizeLimit; }
 
   @Override
+  public int maxBatchSize() { return batchSizeLimit; }
+
+  @Override
   public int skipRows(int requestedCount) {
 
     // Can only skip rows when a batch is active.
@@ -692,6 +721,10 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
 
     harvestBatchCount++;
     previousRowCount += rowCount;
+
+    if (atLimit()) {
+      state = State.LIMITED;
+    }
     return container;
   }
 
@@ -712,6 +745,19 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
   }
 
   @Override
+  public boolean atLimit() {
+    switch (state) {
+    case LIMITED:
+      return true;
+    case ACTIVE:
+    case HARVESTED:
+      return totalRowCount() >= options.scanLimit;
+    default:
+      return false;
+    }
+  }
+
+  @Override
   public TupleMetadata outputSchema() {
     return rootState.outputSchema();
   }
@@ -739,8 +785,8 @@ public class ResultSetLoaderImpl implements 
ResultSetLoader, LoaderInternals {
   }
 
   @Override
-  public int totalRowCount() {
-    int total = previousRowCount;
+  public long totalRowCount() {
+    long total = previousRowCount;
     if (isBatchActive()) {
       total += pendingRowCount + writerIndex.size();
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
index a4e75b2..5a041a9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetOptionBuilder.java
@@ -38,6 +38,7 @@ public class ResultSetOptionBuilder {
   protected ProjectionFilter projectionFilter;
   protected TupleMetadata readerSchema;
   protected long maxBatchSize;
+  protected long scanLimit = Long.MAX_VALUE;
 
   /**
    * Error message context
@@ -114,6 +115,15 @@ public class ResultSetOptionBuilder {
     return this;
   }
 
+  public ResultSetOptionBuilder limit(long limit) {
+    if (limit < 0) {
+      this.scanLimit = Long.MAX_VALUE;
+    } else {
+      this.scanLimit = limit;
+    }
+    return this;
+  }
+
   /**
    * Provides context for error messages.
    */
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
index d907a59..d66798a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/RowSetLoaderImpl.java
@@ -74,6 +74,9 @@ public class RowSetLoaderImpl extends AbstractTupleWriter 
implements RowSetLoade
 
   @Override
   public boolean start() {
+    if (rsLoader.atLimit()) {
+      return false;
+    }
     if (rsLoader.isFull()) {
 
       // Full batch? Return false.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
index fe73cbb..dc59e0b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/WriterIndexImpl.java
@@ -89,7 +89,7 @@ class WriterIndexImpl implements ColumnWriterIndex {
     return rowIndex;
   }
 
-  public boolean valid() { return rowIndex < rsLoader.targetRowCount(); }
+  public boolean valid() { return rowIndex < rsLoader.maxBatchSize(); }
 
   @Override
   public void rollover() {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
index 4edaddb..568c30c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockBatchReader.java
@@ -108,6 +108,7 @@ public class ExtendedMockBatchReader implements 
ManagedReader<SchemaNegotiator>
     if (batchSize > 0) {
       schemaNegotiator.batchSize(batchSize);
     }
+    schemaNegotiator.limit(config.getRecords());
 
     loader = schemaNegotiator.build();
     writer = loader.writer();
@@ -119,16 +120,8 @@ public class ExtendedMockBatchReader implements 
ManagedReader<SchemaNegotiator>
 
   @Override
   public boolean next() {
-    final int rowCount = config.getRecords() - loader.totalRowCount();
-    if (rowCount <= 0) {
-      return false;
-    }
-
     final Random rand = new Random();
-    for (int i = 0; i < rowCount; i++) {
-      if (writer.isFull()) {
-        break;
-      }
+    while (!writer.isFull()) {
       writer.start();
       for (int j = 0; j < fields.length; j++) {
         if (fields[j].nullable && rand.nextInt(100) < 
fields[j].nullablePercent) {
@@ -140,7 +133,7 @@ public class ExtendedMockBatchReader implements 
ManagedReader<SchemaNegotiator>
       writer.save();
     }
 
-    return true;
+    return !loader.atLimit();
   }
 
   @Override
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
index 5633943..f394bcf 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/BaseScanOperatorExecTest.java
@@ -35,6 +35,8 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test of the scan operator framework. Here the focus is on the
@@ -46,9 +48,8 @@ import org.apache.drill.test.rowSet.RowSetUtilities;
  * details of another, supporting class, then tests for that class
  * appear elsewhere.
  */
-
 public class BaseScanOperatorExecTest extends SubOperatorTest {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BaseScanOperatorExecTest.class);
+  static final Logger logger = 
LoggerFactory.getLogger(BaseScanOperatorExecTest.class);
 
   /**
    * Base class for the "mock" readers used in this test. The mock readers
@@ -57,7 +58,6 @@ public class BaseScanOperatorExecTest extends SubOperatorTest 
{
    * They also expose internal state such as identifying which methods
    * were actually called.
    */
-
   protected static abstract class BaseMockBatchReader implements 
ManagedReader<SchemaNegotiator> {
     protected boolean openCalled;
     protected boolean closeCalled;
@@ -94,7 +94,6 @@ public class BaseScanOperatorExecTest extends SubOperatorTest 
{
    * Mock reader that pretends to have a schema at open time
    * like an HBase or JDBC reader.
    */
-
   protected static class MockEarlySchemaReader extends BaseMockBatchReader {
 
     @Override
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
index 2566bcd..938f384 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecBasics.java
@@ -378,7 +378,6 @@ public class TestScanOperExecBasics extends 
BaseScanOperatorExecTest {
    * early schema. Results in an empty (rather than null)
    * result set.
    */
-
   @Test
   public void testMultiEOFOnFirstBatch() {
     MockEarlySchemaReader reader1 = new MockEarlySchemaReader();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
index 5c6fa80..b50394d 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecEarlySchema.java
@@ -41,7 +41,6 @@ public class TestScanOperExecEarlySchema extends 
BaseScanOperatorExecTest {
   /**
    * Mock reader that returns no schema and no records.
    */
-
   private static class MockNullEarlySchemaReader extends BaseMockBatchReader {
 
     @Override
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
new file mode 100644
index 0000000..b169f58
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecLimit.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V1 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ */
+public class TestScanOperExecLimit extends BaseScanOperatorExecTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements 
ManagedReader<SchemaNegotiator> {
+    protected boolean openCalled;
+    protected ResultSetLoader tableLoader;
+
+    @Override
+    public boolean open(SchemaNegotiator negotiator) {
+      openCalled = true;
+      negotiator.tableSchema(new SchemaBuilder()
+          .add("a", MinorType.INT)
+          .build(), true);
+        tableLoader = negotiator.build();
+      return true;
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 2) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(0);
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(0, batch.rowCount());
+    assertEquals(1, batch.schema().getFieldCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(1);
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(1, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(50);
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnScondBatch() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(75);
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(25, batch.rowCount());
+    batch.release();
+
+    // No second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 100, at EOF of the first reader.
+   */
+  @Test
+  public void testLimitOnEOF() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(100);
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second reader
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertFalse(reader2.openCalled);
+  }
+
+  /**
+   * LIMIT 125: full first reader, limit on second
+   */
+  @Test
+  public void testLimitOnSecondReader() {
+    Mock50RowReader reader1 = new Mock50RowReader();
+    Mock50RowReader reader2 = new Mock50RowReader();
+
+    BaseScanFixtureBuilder builder = simpleBuilder(reader1, reader2);
+    builder.builder.limit(125);
+    ScanFixture scanFixture = builder.build();
+    ScanOperatorExec scan = scanFixture.scanOp;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // First batch, second reader
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(25, batch.rowCount());
+    batch.release();
+
+    // No second batch
+    assertFalse(scan.next());
+
+    scanFixture.close();
+
+    assertTrue(reader1.openCalled);
+    assertTrue(reader2.openCalled);
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
index 225a3fc..21db742 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestScanOperExecOverflow.java
@@ -44,7 +44,6 @@ public class TestScanOperExecOverflow extends 
BaseScanOperatorExecTest {
    * Mock reader that produces "jumbo" batches that cause a vector to
    * fill and a row to overflow from one batch to the next.
    */
-
   private static class OverflowReader extends BaseMockBatchReader {
 
     private final String value;
@@ -111,7 +110,6 @@ public class TestScanOperExecOverflow extends 
BaseScanOperatorExecTest {
    * that overflow. Specifically, test a corner case. A batch ends right
    * at file EOF, but that last batch overflowed.
    */
-
   @Test
   public void testMultipleReadersWithOverflow() {
     runOverflowTest(false);
@@ -191,5 +189,4 @@ public class TestScanOperExecOverflow extends 
BaseScanOperatorExecTest {
     assertEquals(0, scan.batchAccessor().rowCount());
     scanFixture.close();
   }
-
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
new file mode 100644
index 0000000..19c7a84
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/TestScanLimit.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.scan.ScanOperatorExec;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is from the outside: at the scan operator level.
+ */
+public class TestScanLimit extends BaseScanTest {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader {
+
+    private final ResultSetLoader tableLoader;
+
+    public Mock50RowReader(SchemaNegotiator negotiator) {
+      negotiator.tableSchema(new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build());
+      tableLoader = negotiator.build();
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 1) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  private static class TestFixture {
+    ObservableCreator creator1;
+    ObservableCreator creator2;
+    ScanFixture scanFixture;
+    ScanOperatorExec scan;
+
+    public TestFixture(long limit) {
+      creator1 = new ObservableCreator() {
+        @Override
+        public ManagedReader create(SchemaNegotiator negotiator) {
+          return new Mock50RowReader(negotiator);
+        }
+      };
+      creator2 = new ObservableCreator() {
+        @Override
+        public ManagedReader create(SchemaNegotiator negotiator) {
+          return new Mock50RowReader(negotiator);
+        }
+      };
+      BaseScanFixtureBuilder builder = simpleBuilder(creator1, creator2);
+      builder.builder.limit(limit);
+      scanFixture = builder.build();
+      scan = scanFixture.scanOp;
+    }
+
+    public void close() { scanFixture.close(); }
+
+    public int createCount() {
+      if (creator1.reader == null) {
+        return 0;
+      }
+      if (creator2.reader == null) {
+        return 1;
+      }
+      return 2;
+    }
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    TestFixture fixture = new TestFixture(0);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(0, batch.rowCount());
+    assertEquals(1, batch.schema().getFieldCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    TestFixture fixture = new TestFixture(1);
+    ScanOperatorExec scan = fixture.scan;
+
+    // Reader builds schema, and stops after one row, though the reader
+    // itself is happy to provide more.
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(1, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    TestFixture fixture = new TestFixture(50);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second batch or second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnSecondBatch() {
+    TestFixture fixture = new TestFixture(75);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(25, batch.rowCount());
+    batch.release();
+
+    // No second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 100, at EOF of the first reader.
+   */
+  @Test
+  public void testLimitOnEOF() {
+    TestFixture fixture = new TestFixture(100);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // No second reader
+    assertFalse(scan.next());
+
+    fixture.close();
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, fixture.createCount());
+  }
+
+  /**
+   * LIMIT 125: full first reader, limit on second
+   */
+  @Test
+  public void testLimitOnSecondReader() {
+    TestFixture fixture = new TestFixture(125);
+    ScanOperatorExec scan = fixture.scan;
+
+    assertTrue(scan.buildSchema());
+    assertTrue(scan.next());
+    BatchAccessor batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(50, batch.rowCount());
+    batch.release();
+
+    // First batch, second reader
+    assertTrue(scan.next());
+    batch = scan.batchAccessor();
+    assertEquals(25, batch.rowCount());
+    batch.release();
+
+    // No second batch
+    assertFalse(scan.next());
+
+    fixture.close();
+
+    // Both readers were created.
+    assertEquals(2, fixture.createCount());
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
index df1fa0c..3df94ff 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/BaseTestScanLifecycle.java
@@ -89,6 +89,8 @@ public class BaseTestScanLifecycle extends SubOperatorTest {
     public boolean hasNext() {
       return counter < 2;
     }
+
+    public int count() { return counter; }
   }
 
   public static final TupleMetadata SCHEMA = new SchemaBuilder()
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
new file mode 100644
index 0000000..bfde5ed
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/v3/lifecycle/TestScanLifecycleLimit.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.v3.lifecycle;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.RowBatchReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetTestUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.junit.Test;
+
+/**
+ * Verifies that the V2 scan framework properly pushes the LIMIT
+ * into the scan by stopping the scan once any reader produces
+ * the rows up to the limit. Verifies that the result set loader
+ * enforces the limit at the batch level, that the reader lifecycle
+ * enforces limits at the reader level, and that the scan framework
+ * enforces limits at by skipping over unneeded readers.
+ * <p>
+ * This test is at the level of the scan framework, stripping away
+ * the outer scan operator level.
+ */
+public class TestScanLifecycleLimit extends BaseTestScanLifecycle {
+
+  /**
+   * Mock reader that returns two 50-row batches.
+   */
+  protected static class Mock50RowReader implements ManagedReader {
+
+    private final ResultSetLoader tableLoader;
+
+    public Mock50RowReader(SchemaNegotiator negotiator) {
+      negotiator.tableSchema(new SchemaBuilder()
+        .add("a", MinorType.INT)
+        .build());
+      tableLoader = negotiator.build();
+    }
+
+    @Override
+    public boolean next() {
+      if (tableLoader.batchCount() > 1) {
+        return false;
+      }
+      RowSetLoader rowSet = tableLoader.writer();
+      int base = tableLoader.batchCount() * 50 + 1;
+      for (int i = 0; i < 50; i++) {
+        if (rowSet.isFull()) {
+          break;
+        }
+        rowSet.addSingleCol(base + i);
+      }
+      return true;
+    }
+
+    @Override
+    public void close() { }
+  }
+
+  private Pair<TwoReaderFactory, ScanLifecycle> setupScan(long limit) {
+    ScanLifecycleBuilder builder = new ScanLifecycleBuilder();
+    builder.projection(RowSetTestUtils.projectList("a"));
+    TwoReaderFactory factory = new TwoReaderFactory() {
+      @Override
+      public ManagedReader firstReader(SchemaNegotiator negotiator) {
+        return new Mock50RowReader(negotiator);
+      }
+
+      @Override
+      public ManagedReader secondReader(SchemaNegotiator negotiator) {
+        return new Mock50RowReader(negotiator);
+      }
+    };
+    builder.readerFactory(factory);
+
+    builder.limit(limit);
+    return Pair.of(factory, buildScan(builder));
+  }
+
+  /**
+   * LIMIT 0, to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(0);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    // Reader builds schema, but returns no data, though the reader
+    // itself is happy to provide data.
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(0, result.rowCount());
+    assertEquals(1, result.schema().size());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 1, simplest case
+   */
+  @Test
+  public void testLimit1() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(1);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    // Reader builds schema, and stops after one row, though the reader
+    // itself is happy to provide more.
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(1, result.rowCount());
+    assertEquals(1, result.schema().size());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 50, same as batch size, to check boundary conditions.
+   */
+  @Test
+  public void testLimitOnBatchEnd() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(50);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 75, halfway through second batch.
+   */
+  @Test
+  public void testLimitOnSecondBatch() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(75);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+
+    // First batch
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // Second batch
+    assertTrue(reader.next());
+    result = fixture.wrap(reader.output());
+    assertEquals(25, result.rowCount());
+    result.clear();
+
+    // No third batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 100, at EOF of the first reader.
+   */
+  @Test
+  public void testLimitOnEOF() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(100);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+
+    // First batch
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // Second batch
+    assertTrue(reader.next());
+    result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // No third batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // No next reader, despite there being two, since we hit the limit.
+    assertNull(scan.nextReader());
+
+    scan.close();
+
+    // Only the first of the two readers were created.
+    assertEquals(1, factory.count());
+  }
+
+  /**
+   * LIMIT 125: full first reader, limit on second
+   */
+  @Test
+  public void testLimitOnSecondReader() {
+    Pair<TwoReaderFactory, ScanLifecycle> pair = setupScan(125);
+    TwoReaderFactory factory = pair.getLeft();
+    ScanLifecycle scan = pair.getRight();
+
+    RowBatchReader reader = scan.nextReader();
+    assertTrue(reader.open());
+
+    // First batch
+    assertTrue(reader.next());
+    RowSet result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // Second batch
+    assertTrue(reader.next());
+    result = fixture.wrap(reader.output());
+    assertEquals(50, result.rowCount());
+    result.clear();
+
+    // No third batch
+    assertFalse(reader.next());
+    reader.close();
+
+    // Move to second reader.
+    reader = scan.nextReader();
+    assertNotNull(reader);
+    assertTrue(reader.open());
+
+    // First is limited
+    assertTrue(reader.next());
+    result = fixture.wrap(reader.output());
+    assertEquals(25, result.rowCount());
+    result.clear();
+
+    // No second batch
+    assertFalse(reader.next());
+    reader.close();
+
+    scan.close();
+
+    // Both readers were created.
+    assertEquals(2, factory.count());
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
index 2104326..49ff638 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderLimits.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.resultSet.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Arrays;
@@ -29,7 +30,9 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import 
org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
+import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.test.SubOperatorTest;
 import org.junit.Test;
@@ -46,7 +49,6 @@ import org.junit.experimental.categories.Category;
  * in fact, depend on the row count) and vector overflow (which an occur when
  * the row limit turns out to be too large.)
  */
-
 @Category(RowSetTests.class)
 public class TestResultSetLoaderLimits extends SubOperatorTest {
 
@@ -54,7 +56,6 @@ public class TestResultSetLoaderLimits extends 
SubOperatorTest {
    * Verify that the writer stops when reaching the row limit.
    * In this case there is no look-ahead row.
    */
-
   @Test
   public void testRowLimit() {
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
@@ -100,7 +101,6 @@ public class TestResultSetLoaderLimits extends 
SubOperatorTest {
   /**
    * Verify that the caller can set a row limit lower than the default.
    */
-
   @Test
   public void testCustomRowLimit() {
 
@@ -183,7 +183,6 @@ public class TestResultSetLoaderLimits extends 
SubOperatorTest {
   /**
    * Test that the row limit can change between batches.
    */
-
   @Test
   public void testDynamicLimit() {
 
@@ -224,6 +223,108 @@ public class TestResultSetLoaderLimits extends 
SubOperatorTest {
     assertEquals(count, rootWriter.rowCount());
     rsLoader.harvest().clear();
 
+    // Test limits
+    rsLoader.setTargetRowCount(-3);
+    assertEquals(1, rsLoader.targetRowCount());
+    rsLoader.setTargetRowCount(Integer.MAX_VALUE);
+    assertEquals(ValueVector.MAX_ROW_COUNT, rsLoader.targetRowCount());
+
+    rsLoader.close();
+  }
+
+  /**
+   * Limit 0 is used to obtain only the schema.
+   */
+  @Test
+  public void testLimit0() {
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .limit(0)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), 
options);
+
+    // Can define a schema-only batch.
+    assertTrue(rsLoader.startBatch());
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, 
DataMode.REQUIRED));
+
+    // But, can't add any rows.
+    assertTrue(rootWriter.isFull());
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(0, result.rowCount());
+    assertTrue(rsLoader.atLimit());
+    TupleMetadata schema = new SchemaBuilder()
+        .add("s", MinorType.VARCHAR)
+        .buildSchema();
+    assertTrue(schema.equals(result.schema()));
+    result.clear();
+
+    // Can't start a data batch.
+    assertFalse(rsLoader.startBatch());
+
+    // Can't start a row.
+    assertFalse(rootWriter.start());
+
+    rsLoader.close();
+  }
+
+  /**
+   * Pathological limit case: a single row.
+   */
+  @Test
+  public void testLimit1() {
+
+    // Start with a small limit.
+
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .limit(1)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), 
options);
+
+    assertTrue(rsLoader.startBatch());
+    assertEquals(1, rsLoader.maxBatchSize());
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, 
DataMode.REQUIRED));
+    rootWriter.addRow("foo");
+    assertTrue(rootWriter.isFull());
+    assertFalse(rootWriter.start());
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(1, result.rowCount());
+    result.clear();
+    assertTrue(rsLoader.atLimit());
+    rsLoader.close();
+  }
+
+  /**
+   * Test filling one batch normally, then hitting the scan limit on the 
second.
+   */
+  @Test
+  public void testLimit100() {
+    ResultSetOptions options = new ResultSetOptionBuilder()
+        .rowCountLimit(75)
+        .limit(100)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), 
options);
+
+    RowSetLoader rootWriter = rsLoader.writer();
+    rootWriter.addColumn(SchemaBuilder.columnSchema("s", MinorType.VARCHAR, 
DataMode.REQUIRED));
+
+    rsLoader.startBatch();
+    int count = fillToLimit(rootWriter);
+    assertEquals(75, count);
+    assertEquals(count, rootWriter.rowCount());
+    rsLoader.harvest().clear();
+    assertFalse(rsLoader.atLimit());
+
+    // Second batch will hit the limit
+
+    rsLoader.startBatch();
+    count = fillToLimit(rootWriter);
+    assertEquals(25, count);
+    assertEquals(count, rootWriter.rowCount());
+    rsLoader.harvest().clear();
+    assertTrue(rsLoader.atLimit());
+
     rsLoader.close();
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
index 847c953..c68bdae 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetLoaderProtocol.java
@@ -77,7 +77,6 @@ import org.junit.experimental.categories.Category;
  * the structure. The object tree will show all the components and their
  * current state.
  */
-
 @Category(RowSetTests.class)
 public class TestResultSetLoaderProtocol extends SubOperatorTest {
 
@@ -317,7 +316,6 @@ public class TestResultSetLoaderProtocol extends 
SubOperatorTest {
    * additional information. The code here simply uses the 
<tt>MaterializedField</tt>
    * to create a <tt>ColumnMetadata</tt> implicitly.
    */
-
   @Test
   public void testCaseInsensitiveSchema() {
     ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator());
@@ -573,7 +571,6 @@ public class TestResultSetLoaderProtocol extends 
SubOperatorTest {
    * Also verifies the test-time method to set a row of values using
    * a single method.
    */
-
   @Test
   public void testInitialSchema() {
     TupleMetadata schema = new SchemaBuilder()
@@ -606,7 +603,7 @@ public class TestResultSetLoaderProtocol extends 
SubOperatorTest {
 
   /**
    * The writer protocol allows a client to write to a row any number of times
-   * before invoking <tt>save()</tt>. In this case, each new value simply
+   * before invoking {@code save()}. In this case, each new value simply
    * overwrites the previous value. Here, we test the most basic case: a 
simple,
    * flat tuple with no arrays. We use a very large Varchar that would, if
    * overwrite were not working, cause vector overflow.
@@ -628,7 +625,6 @@ public class TestResultSetLoaderProtocol extends 
SubOperatorTest {
    * Note that there is no explicit method to discard a row. Instead,
    * the rule is that a row is not saved until <tt>save()</tt> is called.
    */
-
   @Test
   public void testOverwriteRow() {
     TupleMetadata schema = new SchemaBuilder()
@@ -685,7 +681,6 @@ public class TestResultSetLoaderProtocol extends 
SubOperatorTest {
    * Test that memory is released if the loader is closed with an active
    * batch (that is, before the batch is harvested.)
    */
-
   @Test
   public void testCloseWithoutHarvest() {
     TupleMetadata schema = new SchemaBuilder()
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
index d3b2f85..f137a24 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/WriterEvents.java
@@ -39,7 +39,6 @@ import 
org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;
  * array. Instead, the order of calls, or selectively making or not making
  * calls, can change.
  */
-
 public interface WriterEvents extends WriterPosition {
 
   /**
@@ -47,7 +46,6 @@ public interface WriterEvents extends WriterPosition {
    * operations to newly-added columns to synchronize them with the rest
    * of the writers.
    */
-
   enum State {
 
     /**
@@ -57,20 +55,18 @@ public interface WriterEvents extends WriterPosition {
     IDLE,
 
     /**
-     * <tt>startWrite()</tt> has been called to start a write operation
-     * (start a batch), but <tt>startValue()</tt> has not yet been called
-     * to start a row (or value within an array). <tt>startWrite()</tt> must
+     * {@code startWrite()} has been called to start a write operation
+     * (start a batch), but {@code startValue()} has not yet been called
+     * to start a row (or value within an array). {@code startWrite()} must
      * be called on newly added columns.
      */
-
     IN_WRITE,
 
     /**
-     * Both <tt>startWrite()</tt> and <tt>startValue()</tt> has been called on
+     * Both {@code startWrite()} and {@code startValue()} has been called on
      * the tuple to prepare for writing values, and both must be called on
      * newly-added vectors.
      */
-
     IN_ROW
   }
 
@@ -80,7 +76,6 @@ public interface WriterEvents extends WriterPosition {
    * listener is bound, and a vector overflows, then an exception is
    * thrown.
    */
-
   interface ColumnWriterListener {
 
     /**
@@ -90,7 +85,6 @@ public interface WriterEvents extends WriterPosition {
      *
      * @param writer the writer that triggered the overflow
      */
-
     void overflowed(ScalarWriter writer);
 
     /**
@@ -102,9 +96,8 @@ public interface WriterEvents extends WriterPosition {
      * @param delta the amount by which the vector is to grow
      * @return true if the vector can be grown, false if the writer
      * should instead trigger an overflow by calling
-     * <tt>overflowed()</tt>
+     * {@code overflowed()}
      */
-
     boolean canExpand(ScalarWriter writer, int delta);
   }
 
@@ -114,7 +107,6 @@ public interface WriterEvents extends WriterPosition {
    * @param index the writer index (top level or nested for
    * arrays)
    */
-
   void bindIndex(ColumnWriterIndex index);
 
   /**
@@ -127,21 +119,18 @@ public interface WriterEvents extends WriterPosition {
    * @param listener
    *          the vector event listener to bind
    */
-
   void bindListener(ColumnWriterListener listener);
 
   /**
    * Start a write (batch) operation. Performs any vector initialization
    * required at the start of a batch (especially for offset vectors.)
    */
-
   void startWrite();
 
   /**
    * Start a new row. To be called only when a row is not active. To
    * restart a row, call {@link #restartRow()} instead.
    */
-
   void startRow();
 
   /**
@@ -152,7 +141,6 @@ public interface WriterEvents extends WriterPosition {
    * offset vector based on the cumulative value saves is done when
    * saving the row.
    */
-
   void endArrayValue();
 
   /**
@@ -161,27 +149,23 @@ public interface WriterEvents extends WriterPosition {
    * Done when abandoning the current row, such as when filtering out
    * a row at read time.
    */
-
   void restartRow();
 
   /**
    * Saves a row. Commits offset vector locations and advances each to
    * the next position. Can be called only when a row is active.
    */
-
   void saveRow();
 
   /**
    * End a batch: finalize any vector values.
    */
-
   void endWrite();
 
   /**
    * The vectors backing this vector are about to roll over. Finish
    * the current batch up to, but not including, the current row.
    */
-
   void preRollover();
 
   /**
@@ -191,7 +175,6 @@ public interface WriterEvents extends WriterPosition {
    * for the current row now resides at the start of a new vector instead
    * of its previous location elsewhere in an old vector.
    */
-
   void postRollover();
 
   abstract void dump(HierarchicalFormatter format);

Reply via email to