This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new e692fc6  DRILL-8085: EVF V2 support in the "Easy" format plugin (#2419)
e692fc6 is described below

commit e692fc6b621838cfd4df5b4be878892abae234ce
Author: Paul Rogers <[email protected]>
AuthorDate: Tue Mar 1 00:35:57 2022 -0800

    DRILL-8085: EVF V2 support in the "Easy" format plugin (#2419)
---
 .../drill/exec/store/esri/ShpFormatPlugin.java     |   2 +-
 .../drill/exec/store/excel/ExcelFormatPlugin.java  |   2 +-
 .../drill/exec/store/hdf5/HDF5FormatPlugin.java    |   2 +-
 .../exec/store/httpd/HttpdLogFormatPlugin.java     |   3 +-
 .../drill/exec/store/image/ImageFormatPlugin.java  |   3 +-
 .../store/pcap/plugin/BasePcapFormatPlugin.java    |   3 +-
 .../drill/exec/store/pdf/PdfFormatPlugin.java      |   3 +-
 .../drill/exec/store/sas/SasFormatPlugin.java      |   3 +-
 .../drill/exec/store/spss/SpssFormatPlugin.java    |   3 +-
 .../exec/store/syslog/SyslogFormatPlugin.java      |   3 +-
 .../drill/exec/store/xml/XMLFormatPlugin.java      |   3 +-
 .../drill/exec/physical/impl/scan/ReaderState.java |   6 +-
 .../drill/exec/store/avro/AvroFormatPlugin.java    |   2 +-
 .../exec/store/dfs/easy/ClassicScanBuilder.java    | 103 ++++++++++
 .../exec/store/dfs/easy/EasyFileScanBuilder.java   |  83 ++++++++
 .../exec/store/dfs/easy/EasyFormatPlugin.java      | 221 ++++++++-------------
 .../exec/store/dfs/easy/EvfV1ScanBuilder.java      | 142 +++++++++++++
 .../sequencefile/SequenceFileFormatPlugin.java     |   5 +-
 .../exec/store/easy/text/TextFormatPlugin.java     |   4 +-
 .../drill/exec/store/log/LogFormatPlugin.java      |   2 +-
 20 files changed, 439 insertions(+), 159 deletions(-)

diff --git 
a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
 
b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
index d93243c..1d84491 100644
--- 
a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
+++ 
b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
@@ -78,7 +78,7 @@ public class ShpFormatPlugin extends 
EasyFormatPlugin<ShpFormatConfig> {
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(PLUGIN_NAME)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
 
b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
index 58cae94..063f1f7 100644
--- 
a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
+++ 
b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
@@ -73,7 +73,7 @@ public class ExcelFormatPlugin extends 
EasyFormatPlugin<ExcelFormatConfig> {
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
 
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
index e26ed67..5551af5 100644
--- 
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
+++ 
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -56,7 +56,7 @@ public class HDF5FormatPlugin extends 
EasyFormatPlugin<HDF5FormatConfig> {
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
 
b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index c3120d1..ab6d80a 100644
--- 
a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ 
b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 public class HttpdLogFormatPlugin extends 
EasyFormatPlugin<HttpdLogFormatConfig> {
@@ -75,7 +76,7 @@ public class HttpdLogFormatPlugin extends 
EasyFormatPlugin<HttpdLogFormatConfig>
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
         .readerOperatorType(OPERATOR_TYPE)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
 
b/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
index 22ee644..977faec 100644
--- 
a/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
+++ 
b/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 public class ImageFormatPlugin extends EasyFormatPlugin<ImageFormatConfig> {
@@ -49,7 +50,7 @@ public class ImageFormatPlugin extends 
EasyFormatPlugin<ImageFormatConfig> {
         .compressible(true)
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .supportsProjectPushdown(true)
         .defaultName(ImageFormatConfig.NAME)
diff --git 
a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
 
b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
index 4052245..836c254 100644
--- 
a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
+++ 
b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.drill.exec.store.pcap.PcapBatchReader;
 import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
 import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
@@ -63,7 +64,7 @@ public abstract class BasePcapFormatPlugin<T extends 
PcapFormatConfig> extends E
         .compressible(true)
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .supportsProjectPushdown(true)
         .defaultName(PcapFormatConfig.NAME)
diff --git 
a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
 
b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
index 01ceece..53fb487 100644
--- 
a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
+++ 
b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -67,7 +68,7 @@ public class PdfFormatPlugin extends 
EasyFormatPlugin<PdfFormatConfig> {
       .extensions(pluginConfig.extensions())
       .fsConf(fsConf)
       .defaultName(DEFAULT_NAME)
-      .useEnhancedScan(true)
+      .scanVersion(ScanFrameworkVersion.EVF_V1)
       .supportsLimitPushdown(true)
       .build();
   }
diff --git 
a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
 
b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
index 442e48b..da8bcbc 100644
--- 
a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
+++ 
b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -68,7 +69,7 @@ public class SasFormatPlugin extends 
EasyFormatPlugin<SasFormatConfig> {
       .extensions(pluginConfig.getExtensions())
       .fsConf(fsConf)
       .defaultName(DEFAULT_NAME)
-      .useEnhancedScan(true)
+      .scanVersion(ScanFrameworkVersion.EVF_V1)
       .supportsLimitPushdown(true)
       .build();
   }
diff --git 
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
 
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
index 7e3e94f..e62699a 100644
--- 
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
+++ 
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -67,7 +68,7 @@ public class SpssFormatPlugin extends 
EasyFormatPlugin<SpssFormatConfig> {
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index b439c5e..aec9369 100644
--- 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++ 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
@@ -69,7 +70,7 @@ public class SyslogFormatPlugin extends 
EasyFormatPlugin<SyslogFormatConfig> {
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
 
b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
index b0a3e82..14ce069 100644
--- 
a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
+++ 
b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import 
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
@@ -67,7 +68,7 @@ public class XMLFormatPlugin extends 
EasyFormatPlugin<XMLFormatConfig> {
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index a9a6fe9..ac83b49 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -137,7 +137,6 @@ class ReaderState {
     /**
      * Initial state before opening the reader.
      */
-
     START,
 
     /**
@@ -157,7 +156,6 @@ class ReaderState {
      * the next call to {@link ReaderState#next()} will return this look-ahead
      * batch rather than reading a new one.
      */
-
     LOOK_AHEAD,
 
     /**
@@ -173,20 +171,17 @@ class ReaderState {
      * row in the result set loader. That look-ahead is handled by the
      * (shim) reader which this class manages.
      */
-
     LOOK_AHEAD_WITH_EOF,
 
     /**
      * Normal state: the reader has supplied data but not yet reported EOF.
      */
-
     ACTIVE,
 
     /**
      * The reader has reported EOF. No look-ahead batch is active. The
      * reader's next() method will no longer be called.
      */
-
     EOF,
 
     /**
@@ -250,6 +245,7 @@ class ReaderState {
       throw UserException.validationError(e)
         .message("Invalid runtime type conversion")
         .build(logger);
+
     } catch (Throwable t) {
 
       // Wrap all others in a user exception.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 5051cb4..5235b00 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -53,7 +53,7 @@ public class AvroFormatPlugin extends 
EasyFormatPlugin<AvroFormatConfig> {
         .extensions(formatConfig.getExtensions())
         .fsConf(fsConf)
         .defaultName(DEFAULT_NAME)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/ClassicScanBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/ClassicScanBuilder.java
new file mode 100644
index 0000000..08442c7
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/ClassicScanBuilder.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.shaded.guava.com.google.common.base.Functions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+/**
+ * Build the original scanner based on the {@link RecordReader} interface.
+ * Requires that the storage plugin roll its own solutions for null columns.
+ * Is not able to limit vector or batch sizes. Retained or backward
+ * compatibility with "classic" format plugins which have not yet been
+ * upgraded to use the new framework.
+ */
+public class ClassicScanBuilder {
+
+  private final FragmentContext context;
+  private EasySubScan scan;
+  private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+  public ClassicScanBuilder(FragmentContext context, EasySubScan scan,
+      EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+    this.context = context;
+    this.scan = scan;
+    this.plugin = plugin;
+  }
+
+  public CloseableRecordBatch build() throws ExecutionSetupException {
+    final ColumnExplorer columnExplorer =
+        new ColumnExplorer(context.getOptions(), scan.getColumns());
+
+    if (! columnExplorer.isStarQuery()) {
+      scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), 
scan.getFormatPlugin(),
+          columnExplorer.getTableColumns(), scan.getSelectionRoot(),
+          scan.getPartitionDepth(), scan.getSchema(), scan.getMaxRecords());
+      scan.setOperatorId(scan.getOperatorId());
+    }
+
+    final OperatorContext oContext = context.newOperatorContext(scan);
+    final DrillFileSystem dfs;
+    try {
+      dfs = oContext.newFileSystem(plugin.getFsConf());
+    } catch (final IOException e) {
+      throw new ExecutionSetupException(String.format("Failed to create 
FileSystem: %s", e.getMessage()), e);
+    }
+
+    final List<RecordReader> readers = new LinkedList<>();
+    final List<Map<String, String>> implicitColumns = Lists.newArrayList();
+    Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
+    final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != 
null;
+    for (final FileWork work : scan.getWorkUnits()) {
+      final RecordReader recordReader = plugin.getRecordReader(
+          context, dfs, work, scan.getColumns(), scan.getUserName());
+      readers.add(recordReader);
+      final List<String> partitionValues = ColumnExplorer.listPartitionValues(
+          work.getPath(), scan.getSelectionRoot(), false);
+      final Map<String, String> implicitValues = 
columnExplorer.populateColumns(
+          work.getPath(), partitionValues, supportsFileImplicitColumns, dfs);
+      implicitColumns.add(implicitValues);
+      if (implicitValues.size() > mapWithMaxColumns.size()) {
+        mapWithMaxColumns = implicitValues;
+      }
+    }
+
+    // all readers should have the same number of implicit columns, add 
missing ones with value null
+    final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, 
Functions.constant(null));
+    for (final Map<String, String> map : implicitColumns) {
+      map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+    }
+
+    return new ScanBatch(context, oContext, readers, implicitColumns);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFileScanBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFileScanBuilder.java
new file mode 100644
index 0000000..65f9c71
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFileScanBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException.Builder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import 
org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+
+/**
+ * Create the file scan lifecycle that manages the scan. The lifecycle
+ * creates batch readers one by one for each file or block. It defines semantic
+ * rules for projection. It handles a provided schema. Also handles "early"
+ * or "late" schema readers. This version handles the scan limit automatically.
+ * <p>
+ * This is for "version 2" of EVF. Newer code should use this version
+ */
+public class EasyFileScanBuilder extends FileScanLifecycleBuilder {
+
+  public static class EvfErrorContext implements CustomErrorContext {
+    private final EasySubScan scan;
+    private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+    public EvfErrorContext(EasySubScan scan,
+        EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+      this.scan = scan;
+      this.plugin = plugin;
+    }
+
+    @Override
+    public void addContext(Builder builder) {
+      builder
+        .addContext("Format plugin type", plugin.easyConfig().getDefaultName())
+        .addContext("Format plugin class", plugin.getClass().getSimpleName())
+        .addContext("Plugin config name", plugin.getName());
+      if (scan.getSelectionRoot() != null) {
+        builder.addContext("Table directory", 
scan.getSelectionRoot().toString());
+      }
+    }
+  }
+
+  /**
+   * Constructor
+   *
+   * @param scan the physical operation definition for the scan operation. 
Contains
+   * one or more files to read. (The Easy format plugin works only for files.)
+   * @return the scan framework which orchestrates the scan operation across
+   * potentially many files
+   * @throws ExecutionSetupException for all setup failures
+   */
+  public EasyFileScanBuilder(FragmentContext context, EasySubScan scan,
+      EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+
+    options(context.getOptions());
+    projection(scan.getColumns());
+    userName(scan.getUserName());
+    providedSchema(scan.getSchema());
+    fileSystemConfig(plugin.getFsConf());
+    fileSplitImpls(scan.getWorkUnits());
+    rootDir(scan.getSelectionRoot());
+    maxPartitionDepth(scan.getPartitionDepth());
+    compressible(plugin.easyConfig().isCompressible());
+    limit(scan.getLimit());
+    errorContext(new EvfErrorContext(scan, plugin));
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d701da4..15ddf6b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -19,10 +19,8 @@ package org.apache.drill.exec.store.dfs.easy;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -31,8 +29,8 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.AbstractWriter;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -41,17 +39,16 @@ import 
org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
 import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
 import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import 
org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
 import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
 import org.apache.drill.exec.store.StatisticsRecordWriter;
@@ -62,16 +59,10 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
-import org.apache.drill.exec.metastore.MetadataProviderManager;
-import org.apache.drill.shaded.guava.com.google.common.base.Functions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -84,7 +75,28 @@ import org.slf4j.LoggerFactory;
  * @param <T> the format plugin config for this reader
  */
 public abstract class EasyFormatPlugin<T extends FormatPluginConfig> 
implements FormatPlugin {
-  private static final Logger logger = 
LoggerFactory.getLogger(EasyFormatPlugin.class);
+
+  public enum ScanFrameworkVersion {
+
+    /**
+     * Use the "classic" low-level implementation based on the Scan Batch.
+     */
+    CLASSIC,
+
+    /**
+     * Use the first-generation "EVF" framework.
+     */
+    EVF_V1,
+
+    /**
+     * Use the second-generation EVF framework. New code should use this 
option.
+     * EVF V2 automatically handle LIMIT push-down. Readers that use it should
+     * check the return value of {@code startBatch()} to detect when they 
should
+     * stop reading. Readers can also call {@code atLimit()} after each
+     * {@code harvest()} call to detect the limit and report EOF.
+     */
+    EVF_V2
+  }
 
   /**
    * Defines the static, programmer-defined options for this plugin. These
@@ -120,7 +132,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
      *  structure. Can also be selected at runtime by overriding
      *  {@link #useEnhancedScan()}.
      */
-    private final boolean useEnhancedScan;
+    private final ScanFrameworkVersion scanVersion;
 
     public EasyFormatConfig(EasyFormatConfigBuilder builder) {
       this.matcher = builder.matcher;
@@ -138,7 +150,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
       this.supportsStatistics = builder.supportsStatistics;
       this.readerOperatorType = builder.readerOperatorType;
       this.writerOperatorType = builder.writerOperatorType;
-      this.useEnhancedScan = builder.useEnhancedScan;
+      this.scanVersion = builder.scanVersion;
     }
 
     public BasicFormatMatcher getMatcher() {
@@ -201,8 +213,8 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
       return writerOperatorType;
     }
 
-    public boolean useEnhancedScan() {
-      return useEnhancedScan;
+    public ScanFrameworkVersion scanVersion() {
+      return scanVersion;
     }
 
     public static EasyFormatConfigBuilder builder() {
@@ -226,7 +238,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
           .supportsStatistics(supportsStatistics)
           .readerOperatorType(readerOperatorType)
           .writerOperatorType(writerOperatorType)
-          .useEnhancedScan(useEnhancedScan);
+          .scanVersion(scanVersion);
     }
   }
 
@@ -246,7 +258,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
     private boolean supportsStatistics;
     private String readerOperatorType;
     private String writerOperatorType = "";
-    private boolean useEnhancedScan;
+    private ScanFrameworkVersion scanVersion = ScanFrameworkVersion.CLASSIC;
 
     public EasyFormatConfigBuilder matcher(BasicFormatMatcher matcher) {
       this.matcher = matcher;
@@ -328,8 +340,8 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
       return this;
     }
 
-    public EasyFormatConfigBuilder useEnhancedScan(boolean useEnhancedScan) {
-      this.useEnhancedScan = useEnhancedScan;
+    public EasyFormatConfigBuilder scanVersion(ScanFrameworkVersion 
scanVersion) {
+      this.scanVersion = scanVersion;
       return this;
     }
 
@@ -342,35 +354,6 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
     }
   }
 
-  /**
-   * Builds the readers row-set based scan operator.
-   */
-  private static class EasyReaderFactory extends FileReaderFactory {
-
-    private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
-    private final EasySubScan scan;
-    private final FragmentContext context;
-
-    public EasyReaderFactory(EasyFormatPlugin<? extends FormatPluginConfig> 
plugin,
-        EasySubScan scan, FragmentContext context) {
-      this.plugin = plugin;
-      this.scan = scan;
-      this.context = context;
-    }
-
-    @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      try {
-        return plugin.newBatchReader(scan, context.getOptions());
-      } catch (ExecutionSetupException e) {
-        throw UserException.validationError(e)
-          .addContext("Reason", "Failed to create a batch reader")
-          .addContext(errorContext())
-          .build(logger);
-      }
-    }
-  }
-
   private final String name;
   private final EasyFormatConfig easyConfig;
   private final DrillbitContext context;
@@ -410,7 +393,6 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
    * @param formatConfig the Jackson-serialized format configuration as created
    * by the user in the Drill web console. Holds user-defined options
    */
-
   protected EasyFormatPlugin(String name, EasyFormatConfig config, 
DrillbitContext context,
       StoragePluginConfig storageConfig, T formatConfig) {
     this.name = name;
@@ -502,10 +484,25 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
 
   protected CloseableRecordBatch getReaderBatch(FragmentContext context,
       EasySubScan scan) throws ExecutionSetupException {
-    if (useEnhancedScan()) {
-      return buildScan(context, scan);
-    } else {
-      return buildScanBatch(context, scan);
+    try {
+      switch (scanVersion(context.getOptions())) {
+      case CLASSIC:
+        return buildScanBatch(context, scan);
+      case EVF_V1:
+        return buildScan(context, scan);
+      case EVF_V2:
+        return buildScanV3(context, scan);
+      default:
+        throw new IllegalStateException("Unknown scan version");
+      }
+    } catch (final UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (final ExecutionSetupException e) {
+      throw e;
+    } catch (final Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
     }
   }
 
@@ -518,8 +515,8 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
    * @return true to use the enhanced scan framework, false for the
    * traditional scan-batch framework
    */
-  protected boolean useEnhancedScan() {
-    return easyConfig.useEnhancedScan();
+  protected ScanFrameworkVersion scanVersion(OptionManager options) {
+    return easyConfig.scanVersion;
   }
 
   /**
@@ -531,49 +528,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
    */
   private CloseableRecordBatch buildScanBatch(FragmentContext context,
       EasySubScan scan) throws ExecutionSetupException {
-    final ColumnExplorer columnExplorer =
-        new ColumnExplorer(context.getOptions(), scan.getColumns());
-
-    if (! columnExplorer.isStarQuery()) {
-      scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), 
scan.getFormatPlugin(),
-          columnExplorer.getTableColumns(), scan.getSelectionRoot(),
-          scan.getPartitionDepth(), scan.getSchema(), scan.getMaxRecords());
-      scan.setOperatorId(scan.getOperatorId());
-    }
-
-    final OperatorContext oContext = context.newOperatorContext(scan);
-    final DrillFileSystem dfs;
-    try {
-      dfs = oContext.newFileSystem(easyConfig().fsConf);
-    } catch (final IOException e) {
-      throw new ExecutionSetupException(String.format("Failed to create 
FileSystem: %s", e.getMessage()), e);
-    }
-
-    final List<RecordReader> readers = new LinkedList<>();
-    final List<Map<String, String>> implicitColumns = Lists.newArrayList();
-    Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
-    final boolean supportsFileImplicitColumns = scan.getSelectionRoot() != 
null;
-    for (final FileWork work : scan.getWorkUnits()) {
-      final RecordReader recordReader = getRecordReader(
-          context, dfs, work, scan.getColumns(), scan.getUserName());
-      readers.add(recordReader);
-      final List<String> partitionValues = ColumnExplorer.listPartitionValues(
-          work.getPath(), scan.getSelectionRoot(), false);
-      final Map<String, String> implicitValues = 
columnExplorer.populateColumns(
-          work.getPath(), partitionValues, supportsFileImplicitColumns, dfs);
-      implicitColumns.add(implicitValues);
-      if (implicitValues.size() > mapWithMaxColumns.size()) {
-        mapWithMaxColumns = implicitValues;
-      }
-    }
-
-    // all readers should have the same number of implicit columns, add 
missing ones with value null
-    final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns, 
Functions.constant(null));
-    for (final Map<String, String> map : implicitColumns) {
-      map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
-    }
-
-    return new ScanBatch(context, oContext, readers, implicitColumns);
+    return new ClassicScanBuilder(context, scan, this).build();
   }
 
   /**
@@ -584,22 +539,7 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
    */
   private CloseableRecordBatch buildScan(FragmentContext context,
       EasySubScan scan) throws ExecutionSetupException {
-    try {
-      final FileScanBuilder builder = frameworkBuilder(context.getOptions(), 
scan);
-
-      // Add batch reader, if none specified
-
-      if (builder.readerFactory() == null) {
-        builder.setReaderFactory(new EasyReaderFactory(this, scan, context));
-      }
-      return builder.buildScanOperator(context, scan);
-    } catch (final UserException e) {
-      // Rethrow user exceptions directly
-      throw e;
-    } catch (final Throwable e) {
-      // Wrap all others
-      throw new ExecutionSetupException(e);
-    }
+    return new EvfV1ScanBuilder(context, scan, this).build();
   }
 
   /**
@@ -607,6 +547,8 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
    * Call this from the plugin-specific
    * {@link #frameworkBuilder(OptionManager, EasySubScan)} method.
    * The plugin can then customize/revise options as needed.
+   * <p>
+   * For EVF V1, to be removed.
    *
    * @param builder the scan framework builder you create in the
    * {@link #frameworkBuilder(OptionManager, EasySubScan)} method
@@ -614,29 +556,12 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
    * the {@link #frameworkBuilder(OptionManager, EasySubScan)} method
    */
   protected void initScanBuilder(FileScanBuilder builder, EasySubScan scan) {
-    builder.projection(scan.getColumns());
-    builder.setUserName(scan.getUserName());
-
-    // Pass along the output schema, if any
-    builder.providedSchema(scan.getSchema());
-
-    // Pass along file path information
-    builder.setFileSystemConfig(easyConfig().fsConf);
-    builder.setFiles(scan.getWorkUnits());
-    final Path selectionRoot = scan.getSelectionRoot();
-    if (selectionRoot != null) {
-      builder.implicitColumnOptions().setSelectionRoot(selectionRoot);
-      
builder.implicitColumnOptions().setPartitionDepth(scan.getPartitionDepth());
-    }
-
-    // Additional error context to identify this plugin
-    builder.errorContext(
-        currentBuilder -> currentBuilder
-            .addContext("Format plugin", easyConfig.getDefaultName())
-            .addContext("Format plugin", 
EasyFormatPlugin.this.getClass().getSimpleName())
-            .addContext("Plugin config name", getName()));
+    EvfV1ScanBuilder.initScanBuilder(this, builder, scan);
   }
 
+  /**
+   * For EVF V1, to be removed.
+   */
   public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
       EasySubScan scan, OptionManager options) throws ExecutionSetupException {
     throw new ExecutionSetupException("Must implement newBatchReader() if 
using the enhanced framework.");
@@ -648,6 +573,8 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
    * rules for projection. It handles "early" or "late" schema readers. A 
typical
    * framework builds on standardized frameworks for files in general or text
    * files in particular.
+   * <p>
+   * For EVF V1, to be removed.
    *
    * @param scan the physical operation definition for the scan operation. 
Contains
    * one or more files to read. (The Easy format plugin works only for files.)
@@ -660,6 +587,28 @@ public abstract class EasyFormatPlugin<T extends 
FormatPluginConfig> implements
     throw new ExecutionSetupException("Must implement frameworkBuilder() if 
using the enhanced framework.");
   }
 
+  /**
+   * Build a "V2" EVF scanner which offers provided schema, built-in LIMIT 
support
+   * and greater convenience compared with the "V1" EVF framework.
+   */
+  private CloseableRecordBatch buildScanV3(FragmentContext context,
+      EasySubScan scan) throws ExecutionSetupException {
+    EasyFileScanBuilder builder = new EasyFileScanBuilder(context, scan, this);
+    configureScan(builder, scan);
+    return builder.buildScanOperator(context, scan);
+  }
+
+  /**
+   * Configure an EVF (v2) scan, which must at least include the factory to
+   * create readers.
+   *
+   * @param builder the builder with default options already set, and which
+   * allows the plugin implementation to set others
+   */
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan 
scan) {
+    throw new UnsupportedOperationException("Implement this method if you use 
EVF V2");
+  }
+
   public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter 
writer) {
     return false;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
new file mode 100644
index 0000000..b486d38
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create the plugin-specific framework that manages the scan. The framework
+ * creates batch readers one by one for each file or block. It defines semantic
+ * rules for projection. It handles "early" or "late" schema readers. A typical
+ * framework builds on standardized frameworks for files in general or text
+ * files in particular.
+ * <p>
+ * This is for "version 1" of EVF. Newer code should use "version 2."
+ *
+ * @return the scan framework which orchestrates the scan operation across
+ * potentially many files
+ * @throws ExecutionSetupException for all setup failures
+ */
+class EvfV1ScanBuilder {
+  private static final Logger logger = 
LoggerFactory.getLogger(EvfV1ScanBuilder.class);
+
+  /**
+   * Builds the readers for the V1 row-set based scan operator.
+   */
+  private static class EasyReaderFactory extends FileReaderFactory {
+
+    private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+    private final EasySubScan scan;
+    private final FragmentContext context;
+
+    public EasyReaderFactory(EasyFormatPlugin<? extends FormatPluginConfig> 
plugin,
+        EasySubScan scan, FragmentContext context) {
+      this.plugin = plugin;
+      this.scan = scan;
+      this.context = context;
+    }
+
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      try {
+        return plugin.newBatchReader(scan, context.getOptions());
+      } catch (ExecutionSetupException e) {
+        throw UserException.validationError(e)
+          .addContext("Reason", "Failed to create a batch reader")
+          .addContext(errorContext())
+          .build(logger);
+      }
+    }
+  }
+
+  private final FragmentContext context;
+  private final EasySubScan scan;
+  private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+  public EvfV1ScanBuilder(FragmentContext context, EasySubScan scan,
+      EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+    this.context = context;
+    this.scan = scan;
+    this.plugin = plugin;
+  }
+
+  /**
+   * Revised scanner based on the revised {@link 
org.apache.drill.exec.physical.resultSet.ResultSetLoader}
+   * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader} 
classes.
+   * Handles most projection tasks automatically. Able to limit
+   * vector and batch sizes. Use this for new format plugins.
+   */
+  public CloseableRecordBatch build() throws ExecutionSetupException {
+    final FileScanBuilder builder = 
plugin.frameworkBuilder(context.getOptions(), scan);
+
+    // Add batch reader, if none specified
+
+    if (builder.readerFactory() == null) {
+      builder.setReaderFactory(new EasyReaderFactory(plugin, scan, context));
+    }
+    return builder.buildScanOperator(context, scan);
+  }
+
+  /**
+   * Initialize the scan framework builder with standard options.
+   * Call this from the plugin-specific
+   * {@link #frameworkBuilder(OptionManager, EasySubScan)} method.
+   * The plugin can then customize/revise options as needed.
+   *
+   * @param builder the scan framework builder you create in the
+   * {@link #frameworkBuilder(OptionManager, EasySubScan)} method
+   * @param scan the physical scan operator definition passed to
+   * the {@link #frameworkBuilder(OptionManager, EasySubScan)} method
+   */
+  protected static void initScanBuilder(EasyFormatPlugin<? extends 
FormatPluginConfig> plugin,
+      FileScanBuilder builder, EasySubScan scan) {
+    builder.projection(scan.getColumns());
+    builder.setUserName(scan.getUserName());
+
+    // Pass along the output schema, if any
+    builder.providedSchema(scan.getSchema());
+
+    // Pass along file path information
+    builder.setFileSystemConfig(plugin.getFsConf());
+    builder.setFiles(scan.getWorkUnits());
+    final Path selectionRoot = scan.getSelectionRoot();
+    if (selectionRoot != null) {
+      builder.implicitColumnOptions().setSelectionRoot(selectionRoot);
+      
builder.implicitColumnOptions().setPartitionDepth(scan.getPartitionDepth());
+    }
+
+    // Additional error context to identify this plugin
+    builder.errorContext(
+        currentBuilder -> currentBuilder
+            .addContext("Format plugin", plugin.easyConfig().getDefaultName())
+            .addContext("Format plugin", plugin.getClass().getSimpleName())
+            .addContext("Plugin config name", plugin.getName()));
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index 6da129e..b04742a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -52,7 +52,7 @@ public class SequenceFileFormatPlugin extends 
EasyFormatPlugin<SequenceFileForma
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .readerOperatorType(OPERATOR_TYPE)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .supportsProjectPushdown(true)
         .defaultName(SequenceFileFormatConfig.NAME)
@@ -73,7 +73,6 @@ public class SequenceFileFormatPlugin extends 
EasyFormatPlugin<SequenceFileForma
     public ManagedReader<? extends FileSchemaNegotiator> newReader() {
       return new SequenceFileBatchReader(config, scan);
     }
-
   }
 
   @Override
@@ -91,4 +90,4 @@ public class SequenceFileFormatPlugin extends 
EasyFormatPlugin<SequenceFileForma
     builder.nullType(Types.optional(MinorType.VARCHAR));
     return builder;
   }
-}
\ No newline at end of file
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 25649cc..a8b5736 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -243,7 +243,7 @@ public class TextFormatPlugin extends 
EasyFormatPlugin<TextFormatPlugin.TextForm
         .fsConf(fsConf)
         .defaultName(PLUGIN_NAME)
         .writerOperatorType(WRITER_OPERATOR_TYPE)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }
@@ -346,7 +346,7 @@ public class TextFormatPlugin extends 
EasyFormatPlugin<TextFormatPlugin.TextForm
     final double estimatedRowSize = 
settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
 
     if (scan.supportsLimitPushdown() && scan.getLimit() > 0) {
-      return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, 
(long)scan.getLimit(), 1, data);
+      return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, scan.getLimit(), 
1, data);
     } else {
       final double estRowCount = data / estimatedRowSize;
       return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) 
estRowCount, 1, data);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
index 67305c9..ddd1b03 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -91,7 +91,7 @@ public class LogFormatPlugin extends 
EasyFormatPlugin<LogFormatConfig> {
         .fsConf(fsConf)
         .defaultName(PLUGIN_NAME)
         .readerOperatorType(OPERATOR_TYPE)
-        .useEnhancedScan(true)
+        .scanVersion(ScanFrameworkVersion.EVF_V1)
         .supportsLimitPushdown(true)
         .build();
   }

Reply via email to