This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new e692fc6 DRILL-8085: EVF V2 support in the "Easy" format plugin (#2419)
e692fc6 is described below
commit e692fc6b621838cfd4df5b4be878892abae234ce
Author: Paul Rogers <[email protected]>
AuthorDate: Tue Mar 1 00:35:57 2022 -0800
DRILL-8085: EVF V2 support in the "Easy" format plugin (#2419)
---
.../drill/exec/store/esri/ShpFormatPlugin.java | 2 +-
.../drill/exec/store/excel/ExcelFormatPlugin.java | 2 +-
.../drill/exec/store/hdf5/HDF5FormatPlugin.java | 2 +-
.../exec/store/httpd/HttpdLogFormatPlugin.java | 3 +-
.../drill/exec/store/image/ImageFormatPlugin.java | 3 +-
.../store/pcap/plugin/BasePcapFormatPlugin.java | 3 +-
.../drill/exec/store/pdf/PdfFormatPlugin.java | 3 +-
.../drill/exec/store/sas/SasFormatPlugin.java | 3 +-
.../drill/exec/store/spss/SpssFormatPlugin.java | 3 +-
.../exec/store/syslog/SyslogFormatPlugin.java | 3 +-
.../drill/exec/store/xml/XMLFormatPlugin.java | 3 +-
.../drill/exec/physical/impl/scan/ReaderState.java | 6 +-
.../drill/exec/store/avro/AvroFormatPlugin.java | 2 +-
.../exec/store/dfs/easy/ClassicScanBuilder.java | 103 ++++++++++
.../exec/store/dfs/easy/EasyFileScanBuilder.java | 83 ++++++++
.../exec/store/dfs/easy/EasyFormatPlugin.java | 221 ++++++++-------------
.../exec/store/dfs/easy/EvfV1ScanBuilder.java | 142 +++++++++++++
.../sequencefile/SequenceFileFormatPlugin.java | 5 +-
.../exec/store/easy/text/TextFormatPlugin.java | 4 +-
.../drill/exec/store/log/LogFormatPlugin.java | 2 +-
20 files changed, 439 insertions(+), 159 deletions(-)
diff --git
a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
index d93243c..1d84491 100644
---
a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
+++
b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
@@ -78,7 +78,7 @@ public class ShpFormatPlugin extends
EasyFormatPlugin<ShpFormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(PLUGIN_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
index 58cae94..063f1f7 100644
---
a/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
+++
b/contrib/format-excel/src/main/java/org/apache/drill/exec/store/excel/ExcelFormatPlugin.java
@@ -73,7 +73,7 @@ public class ExcelFormatPlugin extends
EasyFormatPlugin<ExcelFormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
index e26ed67..5551af5 100644
---
a/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
+++
b/contrib/format-hdf5/src/main/java/org/apache/drill/exec/store/hdf5/HDF5FormatPlugin.java
@@ -56,7 +56,7 @@ public class HDF5FormatPlugin extends
EasyFormatPlugin<HDF5FormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index c3120d1..ab6d80a 100644
---
a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++
b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
public class HttpdLogFormatPlugin extends
EasyFormatPlugin<HttpdLogFormatConfig> {
@@ -75,7 +76,7 @@ public class HttpdLogFormatPlugin extends
EasyFormatPlugin<HttpdLogFormatConfig>
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
.readerOperatorType(OPERATOR_TYPE)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
b/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
index 22ee644..977faec 100644
---
a/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
+++
b/contrib/format-image/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
public class ImageFormatPlugin extends EasyFormatPlugin<ImageFormatConfig> {
@@ -49,7 +50,7 @@ public class ImageFormatPlugin extends
EasyFormatPlugin<ImageFormatConfig> {
.compressible(true)
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.supportsProjectPushdown(true)
.defaultName(ImageFormatConfig.NAME)
diff --git
a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
index 4052245..836c254 100644
---
a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
+++
b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.drill.exec.store.pcap.PcapBatchReader;
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
@@ -63,7 +64,7 @@ public abstract class BasePcapFormatPlugin<T extends
PcapFormatConfig> extends E
.compressible(true)
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.supportsProjectPushdown(true)
.defaultName(PcapFormatConfig.NAME)
diff --git
a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
index 01ceece..53fb487 100644
---
a/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
+++
b/contrib/format-pdf/src/main/java/org/apache/drill/exec/store/pdf/PdfFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
@@ -67,7 +68,7 @@ public class PdfFormatPlugin extends
EasyFormatPlugin<PdfFormatConfig> {
.extensions(pluginConfig.extensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
index 442e48b..da8bcbc 100644
---
a/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
+++
b/contrib/format-sas/src/main/java/org/apache/drill/exec/store/sas/SasFormatPlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
@@ -68,7 +69,7 @@ public class SasFormatPlugin extends
EasyFormatPlugin<SasFormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
index 7e3e94f..e62699a 100644
---
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
+++
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
@@ -67,7 +68,7 @@ public class SpssFormatPlugin extends
EasyFormatPlugin<SpssFormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index b439c5e..aec9369 100644
---
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
@@ -69,7 +70,7 @@ public class SyslogFormatPlugin extends
EasyFormatPlugin<SyslogFormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
index b0a3e82..14ce069 100644
---
a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
+++
b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
@@ -67,7 +68,7 @@ public class XMLFormatPlugin extends
EasyFormatPlugin<XMLFormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index a9a6fe9..ac83b49 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -137,7 +137,6 @@ class ReaderState {
/**
* Initial state before opening the reader.
*/
-
START,
/**
@@ -157,7 +156,6 @@ class ReaderState {
* the next call to {@link ReaderState#next()} will return this look-ahead
* batch rather than reading a new one.
*/
-
LOOK_AHEAD,
/**
@@ -173,20 +171,17 @@ class ReaderState {
* row in the result set loader. That look-ahead is handled by the
* (shim) reader which this class manages.
*/
-
LOOK_AHEAD_WITH_EOF,
/**
* Normal state: the reader has supplied data but not yet reported EOF.
*/
-
ACTIVE,
/**
* The reader has reported EOF. No look-ahead batch is active. The
* reader's next() method will no longer be called.
*/
-
EOF,
/**
@@ -250,6 +245,7 @@ class ReaderState {
throw UserException.validationError(e)
.message("Invalid runtime type conversion")
.build(logger);
+
} catch (Throwable t) {
// Wrap all others in a user exception.
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 5051cb4..5235b00 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -53,7 +53,7 @@ public class AvroFormatPlugin extends
EasyFormatPlugin<AvroFormatConfig> {
.extensions(formatConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/ClassicScanBuilder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/ClassicScanBuilder.java
new file mode 100644
index 0000000..08442c7
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/ClassicScanBuilder.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.shaded.guava.com.google.common.base.Functions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+/**
+ * Build the original scanner based on the {@link RecordReader} interface.
+ * Requires that the storage plugin roll its own solutions for null columns.
+ * Is not able to limit vector or batch sizes. Retained or backward
+ * compatibility with "classic" format plugins which have not yet been
+ * upgraded to use the new framework.
+ */
+public class ClassicScanBuilder {
+
+ private final FragmentContext context;
+ private EasySubScan scan;
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ClassicScanBuilder(FragmentContext context, EasySubScan scan,
+ EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.context = context;
+ this.scan = scan;
+ this.plugin = plugin;
+ }
+
+ public CloseableRecordBatch build() throws ExecutionSetupException {
+ final ColumnExplorer columnExplorer =
+ new ColumnExplorer(context.getOptions(), scan.getColumns());
+
+ if (! columnExplorer.isStarQuery()) {
+ scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(),
scan.getFormatPlugin(),
+ columnExplorer.getTableColumns(), scan.getSelectionRoot(),
+ scan.getPartitionDepth(), scan.getSchema(), scan.getMaxRecords());
+ scan.setOperatorId(scan.getOperatorId());
+ }
+
+ final OperatorContext oContext = context.newOperatorContext(scan);
+ final DrillFileSystem dfs;
+ try {
+ dfs = oContext.newFileSystem(plugin.getFsConf());
+ } catch (final IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create
FileSystem: %s", e.getMessage()), e);
+ }
+
+ final List<RecordReader> readers = new LinkedList<>();
+ final List<Map<String, String>> implicitColumns = Lists.newArrayList();
+ Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
+ final boolean supportsFileImplicitColumns = scan.getSelectionRoot() !=
null;
+ for (final FileWork work : scan.getWorkUnits()) {
+ final RecordReader recordReader = plugin.getRecordReader(
+ context, dfs, work, scan.getColumns(), scan.getUserName());
+ readers.add(recordReader);
+ final List<String> partitionValues = ColumnExplorer.listPartitionValues(
+ work.getPath(), scan.getSelectionRoot(), false);
+ final Map<String, String> implicitValues =
columnExplorer.populateColumns(
+ work.getPath(), partitionValues, supportsFileImplicitColumns, dfs);
+ implicitColumns.add(implicitValues);
+ if (implicitValues.size() > mapWithMaxColumns.size()) {
+ mapWithMaxColumns = implicitValues;
+ }
+ }
+
+ // all readers should have the same number of implicit columns, add
missing ones with value null
+ final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns,
Functions.constant(null));
+ for (final Map<String, String> map : implicitColumns) {
+ map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+ }
+
+ return new ScanBatch(context, oContext, readers, implicitColumns);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFileScanBuilder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFileScanBuilder.java
new file mode 100644
index 0000000..65f9c71
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFileScanBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException.Builder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import
org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+
+/**
+ * Create the file scan lifecycle that manages the scan. The lifecycle
+ * creates batch readers one by one for each file or block. It defines semantic
+ * rules for projection. It handles a provided schema. Also handles "early"
+ * or "late" schema readers. This version handles the scan limit automatically.
+ * <p>
+ * This is for "version 2" of EVF. Newer code should use this version
+ */
+public class EasyFileScanBuilder extends FileScanLifecycleBuilder {
+
+ public static class EvfErrorContext implements CustomErrorContext {
+ private final EasySubScan scan;
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public EvfErrorContext(EasySubScan scan,
+ EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.scan = scan;
+ this.plugin = plugin;
+ }
+
+ @Override
+ public void addContext(Builder builder) {
+ builder
+ .addContext("Format plugin type", plugin.easyConfig().getDefaultName())
+ .addContext("Format plugin class", plugin.getClass().getSimpleName())
+ .addContext("Plugin config name", plugin.getName());
+ if (scan.getSelectionRoot() != null) {
+ builder.addContext("Table directory",
scan.getSelectionRoot().toString());
+ }
+ }
+ }
+
+ /**
+ * Constructor
+ *
+ * @param scan the physical operation definition for the scan operation.
Contains
+ * one or more files to read. (The Easy format plugin works only for files.)
+ * @return the scan framework which orchestrates the scan operation across
+ * potentially many files
+ * @throws ExecutionSetupException for all setup failures
+ */
+ public EasyFileScanBuilder(FragmentContext context, EasySubScan scan,
+ EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+
+ options(context.getOptions());
+ projection(scan.getColumns());
+ userName(scan.getUserName());
+ providedSchema(scan.getSchema());
+ fileSystemConfig(plugin.getFsConf());
+ fileSplitImpls(scan.getWorkUnits());
+ rootDir(scan.getSelectionRoot());
+ maxPartitionDepth(scan.getPartitionDepth());
+ compressible(plugin.easyConfig().isCompressible());
+ limit(scan.getLimit());
+ errorContext(new EvfErrorContext(scan, plugin));
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d701da4..15ddf6b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -19,10 +19,8 @@ package org.apache.drill.exec.store.dfs.easy;
import java.io.IOException;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -31,8 +29,8 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -41,17 +39,16 @@ import
org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.StatisticsWriterRecordBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import
org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.StatisticsRecordWriter;
@@ -62,16 +59,10 @@ import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
-import org.apache.drill.exec.metastore.MetadataProviderManager;
-import org.apache.drill.shaded.guava.com.google.common.base.Functions;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -84,7 +75,28 @@ import org.slf4j.LoggerFactory;
* @param <T> the format plugin config for this reader
*/
public abstract class EasyFormatPlugin<T extends FormatPluginConfig>
implements FormatPlugin {
- private static final Logger logger =
LoggerFactory.getLogger(EasyFormatPlugin.class);
+
+ public enum ScanFrameworkVersion {
+
+ /**
+ * Use the "classic" low-level implementation based on the Scan Batch.
+ */
+ CLASSIC,
+
+ /**
+ * Use the first-generation "EVF" framework.
+ */
+ EVF_V1,
+
+ /**
+ * Use the second-generation EVF framework. New code should use this
option.
+ * EVF V2 automatically handle LIMIT push-down. Readers that use it should
+ * check the return value of {@code startBatch()} to detect when they
should
+ * stop reading. Readers can also call {@code atLimit()} after each
+ * {@code harvest()} call to detect the limit and report EOF.
+ */
+ EVF_V2
+ }
/**
* Defines the static, programmer-defined options for this plugin. These
@@ -120,7 +132,7 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
* structure. Can also be selected at runtime by overriding
* {@link #useEnhancedScan()}.
*/
- private final boolean useEnhancedScan;
+ private final ScanFrameworkVersion scanVersion;
public EasyFormatConfig(EasyFormatConfigBuilder builder) {
this.matcher = builder.matcher;
@@ -138,7 +150,7 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
this.supportsStatistics = builder.supportsStatistics;
this.readerOperatorType = builder.readerOperatorType;
this.writerOperatorType = builder.writerOperatorType;
- this.useEnhancedScan = builder.useEnhancedScan;
+ this.scanVersion = builder.scanVersion;
}
public BasicFormatMatcher getMatcher() {
@@ -201,8 +213,8 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
return writerOperatorType;
}
- public boolean useEnhancedScan() {
- return useEnhancedScan;
+ public ScanFrameworkVersion scanVersion() {
+ return scanVersion;
}
public static EasyFormatConfigBuilder builder() {
@@ -226,7 +238,7 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
.supportsStatistics(supportsStatistics)
.readerOperatorType(readerOperatorType)
.writerOperatorType(writerOperatorType)
- .useEnhancedScan(useEnhancedScan);
+ .scanVersion(scanVersion);
}
}
@@ -246,7 +258,7 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
private boolean supportsStatistics;
private String readerOperatorType;
private String writerOperatorType = "";
- private boolean useEnhancedScan;
+ private ScanFrameworkVersion scanVersion = ScanFrameworkVersion.CLASSIC;
public EasyFormatConfigBuilder matcher(BasicFormatMatcher matcher) {
this.matcher = matcher;
@@ -328,8 +340,8 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
return this;
}
- public EasyFormatConfigBuilder useEnhancedScan(boolean useEnhancedScan) {
- this.useEnhancedScan = useEnhancedScan;
+ public EasyFormatConfigBuilder scanVersion(ScanFrameworkVersion
scanVersion) {
+ this.scanVersion = scanVersion;
return this;
}
@@ -342,35 +354,6 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
}
}
- /**
- * Builds the readers row-set based scan operator.
- */
- private static class EasyReaderFactory extends FileReaderFactory {
-
- private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
- private final EasySubScan scan;
- private final FragmentContext context;
-
- public EasyReaderFactory(EasyFormatPlugin<? extends FormatPluginConfig>
plugin,
- EasySubScan scan, FragmentContext context) {
- this.plugin = plugin;
- this.scan = scan;
- this.context = context;
- }
-
- @Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- try {
- return plugin.newBatchReader(scan, context.getOptions());
- } catch (ExecutionSetupException e) {
- throw UserException.validationError(e)
- .addContext("Reason", "Failed to create a batch reader")
- .addContext(errorContext())
- .build(logger);
- }
- }
- }
-
private final String name;
private final EasyFormatConfig easyConfig;
private final DrillbitContext context;
@@ -410,7 +393,6 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
* @param formatConfig the Jackson-serialized format configuration as created
* by the user in the Drill web console. Holds user-defined options
*/
-
protected EasyFormatPlugin(String name, EasyFormatConfig config,
DrillbitContext context,
StoragePluginConfig storageConfig, T formatConfig) {
this.name = name;
@@ -502,10 +484,25 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
protected CloseableRecordBatch getReaderBatch(FragmentContext context,
EasySubScan scan) throws ExecutionSetupException {
- if (useEnhancedScan()) {
- return buildScan(context, scan);
- } else {
- return buildScanBatch(context, scan);
+ try {
+ switch (scanVersion(context.getOptions())) {
+ case CLASSIC:
+ return buildScanBatch(context, scan);
+ case EVF_V1:
+ return buildScan(context, scan);
+ case EVF_V2:
+ return buildScanV3(context, scan);
+ default:
+ throw new IllegalStateException("Unknown scan version");
+ }
+ } catch (final UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (final ExecutionSetupException e) {
+ throw e;
+ } catch (final Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
}
}
@@ -518,8 +515,8 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
* @return true to use the enhanced scan framework, false for the
* traditional scan-batch framework
*/
- protected boolean useEnhancedScan() {
- return easyConfig.useEnhancedScan();
+ protected ScanFrameworkVersion scanVersion(OptionManager options) {
+ return easyConfig.scanVersion;
}
/**
@@ -531,49 +528,7 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
*/
private CloseableRecordBatch buildScanBatch(FragmentContext context,
EasySubScan scan) throws ExecutionSetupException {
- final ColumnExplorer columnExplorer =
- new ColumnExplorer(context.getOptions(), scan.getColumns());
-
- if (! columnExplorer.isStarQuery()) {
- scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(),
scan.getFormatPlugin(),
- columnExplorer.getTableColumns(), scan.getSelectionRoot(),
- scan.getPartitionDepth(), scan.getSchema(), scan.getMaxRecords());
- scan.setOperatorId(scan.getOperatorId());
- }
-
- final OperatorContext oContext = context.newOperatorContext(scan);
- final DrillFileSystem dfs;
- try {
- dfs = oContext.newFileSystem(easyConfig().fsConf);
- } catch (final IOException e) {
- throw new ExecutionSetupException(String.format("Failed to create
FileSystem: %s", e.getMessage()), e);
- }
-
- final List<RecordReader> readers = new LinkedList<>();
- final List<Map<String, String>> implicitColumns = Lists.newArrayList();
- Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
- final boolean supportsFileImplicitColumns = scan.getSelectionRoot() !=
null;
- for (final FileWork work : scan.getWorkUnits()) {
- final RecordReader recordReader = getRecordReader(
- context, dfs, work, scan.getColumns(), scan.getUserName());
- readers.add(recordReader);
- final List<String> partitionValues = ColumnExplorer.listPartitionValues(
- work.getPath(), scan.getSelectionRoot(), false);
- final Map<String, String> implicitValues =
columnExplorer.populateColumns(
- work.getPath(), partitionValues, supportsFileImplicitColumns, dfs);
- implicitColumns.add(implicitValues);
- if (implicitValues.size() > mapWithMaxColumns.size()) {
- mapWithMaxColumns = implicitValues;
- }
- }
-
- // all readers should have the same number of implicit columns, add
missing ones with value null
- final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns,
Functions.constant(null));
- for (final Map<String, String> map : implicitColumns) {
- map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
- }
-
- return new ScanBatch(context, oContext, readers, implicitColumns);
+ return new ClassicScanBuilder(context, scan, this).build();
}
/**
@@ -584,22 +539,7 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
*/
private CloseableRecordBatch buildScan(FragmentContext context,
EasySubScan scan) throws ExecutionSetupException {
- try {
- final FileScanBuilder builder = frameworkBuilder(context.getOptions(),
scan);
-
- // Add batch reader, if none specified
-
- if (builder.readerFactory() == null) {
- builder.setReaderFactory(new EasyReaderFactory(this, scan, context));
- }
- return builder.buildScanOperator(context, scan);
- } catch (final UserException e) {
- // Rethrow user exceptions directly
- throw e;
- } catch (final Throwable e) {
- // Wrap all others
- throw new ExecutionSetupException(e);
- }
+ return new EvfV1ScanBuilder(context, scan, this).build();
}
/**
@@ -607,6 +547,8 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
* Call this from the plugin-specific
* {@link #frameworkBuilder(OptionManager, EasySubScan)} method.
* The plugin can then customize/revise options as needed.
+ * <p>
+ * For EVF V1, to be removed.
*
* @param builder the scan framework builder you create in the
* {@link #frameworkBuilder(OptionManager, EasySubScan)} method
@@ -614,29 +556,12 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
* the {@link #frameworkBuilder(OptionManager, EasySubScan)} method
*/
protected void initScanBuilder(FileScanBuilder builder, EasySubScan scan) {
- builder.projection(scan.getColumns());
- builder.setUserName(scan.getUserName());
-
- // Pass along the output schema, if any
- builder.providedSchema(scan.getSchema());
-
- // Pass along file path information
- builder.setFileSystemConfig(easyConfig().fsConf);
- builder.setFiles(scan.getWorkUnits());
- final Path selectionRoot = scan.getSelectionRoot();
- if (selectionRoot != null) {
- builder.implicitColumnOptions().setSelectionRoot(selectionRoot);
-
builder.implicitColumnOptions().setPartitionDepth(scan.getPartitionDepth());
- }
-
- // Additional error context to identify this plugin
- builder.errorContext(
- currentBuilder -> currentBuilder
- .addContext("Format plugin", easyConfig.getDefaultName())
- .addContext("Format plugin",
EasyFormatPlugin.this.getClass().getSimpleName())
- .addContext("Plugin config name", getName()));
+ EvfV1ScanBuilder.initScanBuilder(this, builder, scan);
}
+ /**
+ * For EVF V1, to be removed.
+ */
public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
EasySubScan scan, OptionManager options) throws ExecutionSetupException {
throw new ExecutionSetupException("Must implement newBatchReader() if
using the enhanced framework.");
@@ -648,6 +573,8 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
* rules for projection. It handles "early" or "late" schema readers. A
typical
* framework builds on standardized frameworks for files in general or text
* files in particular.
+ * <p>
+ * For EVF V1, to be removed.
*
* @param scan the physical operation definition for the scan operation.
Contains
* one or more files to read. (The Easy format plugin works only for files.)
@@ -660,6 +587,28 @@ public abstract class EasyFormatPlugin<T extends
FormatPluginConfig> implements
throw new ExecutionSetupException("Must implement frameworkBuilder() if
using the enhanced framework.");
}
+ /**
+ * Build a "V2" EVF scanner which offers provided schema, built-in LIMIT
support
+ * and greater convenience compared with the "V1" EVF framework.
+ */
+ private CloseableRecordBatch buildScanV3(FragmentContext context,
+ EasySubScan scan) throws ExecutionSetupException {
+ EasyFileScanBuilder builder = new EasyFileScanBuilder(context, scan, this);
+ configureScan(builder, scan);
+ return builder.buildScanOperator(context, scan);
+ }
+
+ /**
+ * Configure an EVF (v2) scan, which must at least include the factory to
+ * create readers.
+ *
+ * @param builder the builder with default options already set, and which
+ * allows the plugin implementation to set others
+ */
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan
scan) {
+ throw new UnsupportedOperationException("Implement this method if you use
EVF V2");
+ }
+
public boolean isStatisticsRecordWriter(FragmentContext context, EasyWriter
writer) {
return false;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
new file mode 100644
index 0000000..b486d38
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EvfV1ScanBuilder.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs.easy;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.ops.FragmentContext;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create the plugin-specific framework that manages the scan. The framework
+ * creates batch readers one by one for each file or block. It defines semantic
+ * rules for projection. It handles "early" or "late" schema readers. A typical
+ * framework builds on standardized frameworks for files in general or text
+ * files in particular.
+ * <p>
+ * This is for "version 1" of EVF. Newer code should use "version 2."
+ *
+ * @return the scan framework which orchestrates the scan operation across
+ * potentially many files
+ * @throws ExecutionSetupException for all setup failures
+ */
+class EvfV1ScanBuilder {
+ private static final Logger logger =
LoggerFactory.getLogger(EvfV1ScanBuilder.class);
+
+ /**
+ * Builds the readers for the V1 row-set based scan operator.
+ */
+ private static class EasyReaderFactory extends FileReaderFactory {
+
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+ private final EasySubScan scan;
+ private final FragmentContext context;
+
+ public EasyReaderFactory(EasyFormatPlugin<? extends FormatPluginConfig>
plugin,
+ EasySubScan scan, FragmentContext context) {
+ this.plugin = plugin;
+ this.scan = scan;
+ this.context = context;
+ }
+
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ try {
+ return plugin.newBatchReader(scan, context.getOptions());
+ } catch (ExecutionSetupException e) {
+ throw UserException.validationError(e)
+ .addContext("Reason", "Failed to create a batch reader")
+ .addContext(errorContext())
+ .build(logger);
+ }
+ }
+ }
+
+ private final FragmentContext context;
+ private final EasySubScan scan;
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public EvfV1ScanBuilder(FragmentContext context, EasySubScan scan,
+ EasyFormatPlugin<? extends FormatPluginConfig> plugin) {
+ this.context = context;
+ this.scan = scan;
+ this.plugin = plugin;
+ }
+
+ /**
+ * Revised scanner based on the revised {@link
org.apache.drill.exec.physical.resultSet.ResultSetLoader}
+ * and {@link org.apache.drill.exec.physical.impl.scan.RowBatchReader}
classes.
+ * Handles most projection tasks automatically. Able to limit
+ * vector and batch sizes. Use this for new format plugins.
+ */
+ public CloseableRecordBatch build() throws ExecutionSetupException {
+ final FileScanBuilder builder =
plugin.frameworkBuilder(context.getOptions(), scan);
+
+ // Add batch reader, if none specified
+
+ if (builder.readerFactory() == null) {
+ builder.setReaderFactory(new EasyReaderFactory(plugin, scan, context));
+ }
+ return builder.buildScanOperator(context, scan);
+ }
+
+ /**
+ * Initialize the scan framework builder with standard options.
+ * Call this from the plugin-specific
+ * {@link #frameworkBuilder(OptionManager, EasySubScan)} method.
+ * The plugin can then customize/revise options as needed.
+ *
+ * @param builder the scan framework builder you create in the
+ * {@link #frameworkBuilder(OptionManager, EasySubScan)} method
+ * @param scan the physical scan operator definition passed to
+ * the {@link #frameworkBuilder(OptionManager, EasySubScan)} method
+ */
+ protected static void initScanBuilder(EasyFormatPlugin<? extends
FormatPluginConfig> plugin,
+ FileScanBuilder builder, EasySubScan scan) {
+ builder.projection(scan.getColumns());
+ builder.setUserName(scan.getUserName());
+
+ // Pass along the output schema, if any
+ builder.providedSchema(scan.getSchema());
+
+ // Pass along file path information
+ builder.setFileSystemConfig(plugin.getFsConf());
+ builder.setFiles(scan.getWorkUnits());
+ final Path selectionRoot = scan.getSelectionRoot();
+ if (selectionRoot != null) {
+ builder.implicitColumnOptions().setSelectionRoot(selectionRoot);
+
builder.implicitColumnOptions().setPartitionDepth(scan.getPartitionDepth());
+ }
+
+ // Additional error context to identify this plugin
+ builder.errorContext(
+ currentBuilder -> currentBuilder
+ .addContext("Format plugin", plugin.easyConfig().getDefaultName())
+ .addContext("Format plugin", plugin.getClass().getSimpleName())
+ .addContext("Plugin config name", plugin.getName()));
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index 6da129e..b04742a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -52,7 +52,7 @@ public class SequenceFileFormatPlugin extends
EasyFormatPlugin<SequenceFileForma
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.readerOperatorType(OPERATOR_TYPE)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.supportsProjectPushdown(true)
.defaultName(SequenceFileFormatConfig.NAME)
@@ -73,7 +73,6 @@ public class SequenceFileFormatPlugin extends
EasyFormatPlugin<SequenceFileForma
public ManagedReader<? extends FileSchemaNegotiator> newReader() {
return new SequenceFileBatchReader(config, scan);
}
-
}
@Override
@@ -91,4 +90,4 @@ public class SequenceFileFormatPlugin extends
EasyFormatPlugin<SequenceFileForma
builder.nullType(Types.optional(MinorType.VARCHAR));
return builder;
}
-}
\ No newline at end of file
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 25649cc..a8b5736 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -243,7 +243,7 @@ public class TextFormatPlugin extends
EasyFormatPlugin<TextFormatPlugin.TextForm
.fsConf(fsConf)
.defaultName(PLUGIN_NAME)
.writerOperatorType(WRITER_OPERATOR_TYPE)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}
@@ -346,7 +346,7 @@ public class TextFormatPlugin extends
EasyFormatPlugin<TextFormatPlugin.TextForm
final double estimatedRowSize =
settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
if (scan.supportsLimitPushdown() && scan.getLimit() > 0) {
- return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT,
(long)scan.getLimit(), 1, data);
+ return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, scan.getLimit(),
1, data);
} else {
final double estRowCount = data / estimatedRowSize;
return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long)
estRowCount, 1, data);
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
index 67305c9..ddd1b03 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -91,7 +91,7 @@ public class LogFormatPlugin extends
EasyFormatPlugin<LogFormatConfig> {
.fsConf(fsConf)
.defaultName(PLUGIN_NAME)
.readerOperatorType(OPERATOR_TYPE)
- .useEnhancedScan(true)
+ .scanVersion(ScanFrameworkVersion.EVF_V1)
.supportsLimitPushdown(true)
.build();
}