arina-ielchiieva commented on a change in pull request #1683: DRILL-6952: Host
compliant text reader on the row set framework
URL: https://github.com/apache/drill/pull/1683#discussion_r264042521
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
##########
@@ -55,67 +62,335 @@
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+/**
+ * Base class for various file readers.
+ * <p>
+ * This version provides a bridge between the legacy {@link RecordReader}-style
+ * readers and the newer {@link FileBatchReader} style. Over time, split the
+ * class, or provide a cleaner way to handle the differences.
+ *
+ * @param <T> the format plugin config for this reader
+ */
+
public abstract class EasyFormatPlugin<T extends FormatPluginConfig>
implements FormatPlugin {
- @SuppressWarnings("unused")
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
+ /**
+ * Defines the static, programmer-defined options for this plugin. These
+ * options are attributes of how the plugin works. The plugin config,
+ * defined in the class definition, provides user-defined options that can
+ * vary across uses of the plugin.
+ */
+
+ public static class EasyFormatConfig {
+ public BasicFormatMatcher matcher;
+ public boolean readable = true;
+ public boolean writable;
+ public boolean blockSplittable;
+ public boolean compressible;
+ public Configuration fsConf;
+ public List<String> extensions;
+ public String defaultName;
+
+ // Config options that, prior to Drill 1.15, required the plugin to
+ // override methods. Moving forward, plugins should be migrated to
+ // use this simpler form. New plugins should use these options
+ // instead of overriding methods.
+
+ public boolean supportsProjectPushdown;
+ public boolean supportsAutoPartitioning;
+ public int readerOperatorType = -1;
+ public int writerOperatorType = -1;
+ }
+
+ /**
+ * Creates the scan batch to use with the plugin. Drill supports the
"classic"
+ * style of scan batch and readers, along with the newer size-aware,
+ * component-based version. The implementation of this class assembles the
+ * readers and scan batch operator as needed for each version.
+ */
+
+ public interface ScanBatchCreator {
+ CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan)
+ throws ExecutionSetupException;
+ }
+
+ /**
+ * Use the original scanner based on the {@link RecordReader} interface.
+ * Requires that the storage plugin roll its own solutions for null columns.
+ * Is not able to limit vector or batch sizes. Retained or backward
+ * compatibility with "classic" format plugins which have not yet been
+ * upgraded to use the new framework.
+ */
+
+ public static class ClassicScanBatchCreator implements ScanBatchCreator {
+
+ private final EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ClassicScanBatchCreator(EasyFormatPlugin<? extends
FormatPluginConfig> plugin) {
+ this.plugin = plugin;
+ }
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context, EasySubScan scan) throws
ExecutionSetupException {
+ final ColumnExplorer columnExplorer = new
ColumnExplorer(context.getOptions(), scan.getColumns());
+
+ if (! columnExplorer.isStarQuery()) {
+ scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(),
scan.getFormatPlugin(),
+ columnExplorer.getTableColumns(), scan.getSelectionRoot(),
scan.getPartitionDepth());
+ scan.setOperatorId(scan.getOperatorId());
+ }
+
+ final OperatorContext oContext = context.newOperatorContext(scan);
+ final DrillFileSystem dfs;
+ try {
+ dfs = oContext.newFileSystem(plugin.easyConfig().fsConf);
+ } catch (final IOException e) {
+ throw new ExecutionSetupException(String.format("Failed to create
FileSystem: %s", e.getMessage()), e);
+ }
+
+ final List<RecordReader> readers = new LinkedList<>();
+ final List<Map<String, String>> implicitColumns = Lists.newArrayList();
+ Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
+ final boolean supportsFileImplicitColumns = scan.getSelectionRoot() !=
null;
+ for (final FileWork work : scan.getWorkUnits()) {
+ final RecordReader recordReader = getRecordReader(
+ plugin, context, dfs, work, scan.getColumns(), scan.getUserName());
+ readers.add(recordReader);
+ final List<String> partitionValues =
ColumnExplorer.listPartitionValues(
+ work.getPath(), scan.getSelectionRoot(), false);
+ final Map<String, String> implicitValues =
columnExplorer.populateImplicitColumns(
+ work.getPath(), partitionValues, supportsFileImplicitColumns);
+ implicitColumns.add(implicitValues);
+ if (implicitValues.size() > mapWithMaxColumns.size()) {
+ mapWithMaxColumns = implicitValues;
+ }
+ }
+
+ // all readers should have the same number of implicit columns, add
missing ones with value null
+ final Map<String, String> diff = Maps.transformValues(mapWithMaxColumns,
Functions.constant((String) null));
+ for (final Map<String, String> map : implicitColumns) {
+ map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
+ }
+
+ return new ScanBatch(context, oContext, readers, implicitColumns);
+ }
+
+ /**
+ * Create a record reader given a file system, a file description and other
+ * information. For backward compatibility, calls the plugin method by
+ * default.
+ *
+ * @param plugin
+ * the plugin creating the scan
+ * @param context
+ * fragment context for the fragment running the scan
+ * @param dfs
+ * Drill's distributed file system facade
+ * @param fileWork
+ * description of the file to scan
+ * @param columns
+ * list of columns to project
+ * @param userName
+ * the name of the user performing the scan
+ * @return a scan operator
+ * @throws ExecutionSetupException
+ * if anything goes wrong
+ */
+
+ public RecordReader getRecordReader(EasyFormatPlugin<? extends
FormatPluginConfig> plugin,
+ FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
+ List<SchemaPath> columns, String userName) throws
ExecutionSetupException {
+ return plugin.getRecordReader(context, dfs, fileWork, columns, userName);
+ }
+ }
+
+ /**
+ * Revised scanner based on the revised
+ * {@link ResultSetLoader} and {@link RowBatchReader} classes.
+ * Handles most projection tasks automatically. Able to limit
+ * vector and batch sizes. Use this for new format plugins.
+ */
+
+ public abstract static class ScanFrameworkCreator
+ implements ScanBatchCreator {
+
+ protected EasyFormatPlugin<? extends FormatPluginConfig> plugin;
+
+ public ScanFrameworkCreator(EasyFormatPlugin<? extends FormatPluginConfig>
plugin) {
+ this.plugin = plugin;
+ }
+
+ /**
+ * Builds the revised {@link FileBatchReader}-based scan batch.
+ *
+ * @param context
+ * @param scan
+ * @return
+ * @throws ExecutionSetupException
+ */
+
+ @Override
+ public CloseableRecordBatch buildScan(
+ final FragmentContext context,
+ final EasySubScan scan) throws ExecutionSetupException {
+
+ // Assemble the scan operator and its wrapper.
+
+ try {
+ final BaseFileScanFramework<?> framework = buildFramework(scan);
+ final Path selectionRoot = scan.getSelectionRoot();
+ if (selectionRoot != null) {
+ framework.setSelectionRoot(selectionRoot, scan.getPartitionDepth());
+ }
+ return new OperatorRecordBatch(
+ context, scan,
+ new ScanOperatorExec(
+ framework));
+ } catch (final UserException e) {
+ // Rethrow user exceptions directly
+ throw e;
+ } catch (final Throwable e) {
+ // Wrap all others
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ /**
+ * Create the plugin-specific framework that manages the scan. The
framework
+ * creates batch readers one by one for each file or block. It defines
semantic
+ * rules for projection. It handles "early" or "late" schema readers. A
typical
+ * framework builds on standardized frameworks for files in general or text
+ * files in particular.
+ *
+ * @param scan the physical operation definition for the scan operation.
Contains
+ * one or more files to read. (The Easy format plugin works only for
files.)
+ * @return the scan framework which orchestrates the scan operation across
+ * potentially many files
+ * @throws ExecutionSetupException for all setup failures
+ */
+ protected abstract BaseFileScanFramework<?> buildFramework(
+ EasySubScan scan) throws ExecutionSetupException;
+ }
+
+ /**
+ * Generic framework creator for files that just use the basic file
+ * support: metadata, etc. Specialized use cases (special "columns"
+ * column, say) will require a specialized implementation.
+ */
+
+ public abstract static class FileScanFrameworkCreator extends
ScanFrameworkCreator {
+
+ private final FileReaderFactory readerCreator;
+
+ public FileScanFrameworkCreator(EasyFormatPlugin<? extends
FormatPluginConfig> plugin,
+ FileReaderFactory readerCreator) {
+ super(plugin);
+ this.readerCreator = readerCreator;
+ }
+
+ @Override
+ protected FileScanFramework buildFramework(
+ EasySubScan scan) throws ExecutionSetupException {
- private final BasicFormatMatcher matcher;
+ final FileScanFramework framework = new FileScanFramework(
+ scan.getColumns(),
+ scan.getWorkUnits(),
+ plugin.easyConfig().fsConf,
+ readerCreator);
+ return framework;
+ }
+ }
+
+ private final String name;
+ private final EasyFormatConfig easyConfig;
private final DrillbitContext context;
- private final boolean readable;
- private final boolean writable;
- private final boolean blockSplittable;
- private final Configuration fsConf;
private final StoragePluginConfig storageConfig;
protected final T formatConfig;
- private final String name;
- private final boolean compressible;
+ /**
+ * Legacy constructor.
+ */
protected EasyFormatPlugin(String name, DrillbitContext context,
Configuration fsConf,
- StoragePluginConfig storageConfig, T formatConfig, boolean readable,
boolean writable, boolean blockSplittable,
- boolean compressible, List<String> extensions, String defaultName){
- this.matcher = new BasicFormatMatcher(this, fsConf, extensions,
compressible);
- this.readable = readable;
- this.writable = writable;
+ StoragePluginConfig storageConfig, T formatConfig, boolean readable,
boolean writable,
+ boolean blockSplittable,
+ boolean compressible, List<String> extensions, String defaultName) {
+ this.name = name == null ? defaultName : name;
+ easyConfig = new EasyFormatConfig();
+ easyConfig.matcher = new BasicFormatMatcher(this, fsConf, extensions,
compressible);
+ easyConfig.readable = readable;
+ easyConfig.writable = writable;
this.context = context;
- this.blockSplittable = blockSplittable;
- this.compressible = compressible;
- this.fsConf = fsConf;
+ easyConfig.blockSplittable = blockSplittable;
+ easyConfig.compressible = compressible;
+ easyConfig.fsConf = fsConf;
this.storageConfig = storageConfig;
this.formatConfig = formatConfig;
- this.name = name == null ? defaultName : name;
}
- @Override
- public Configuration getFsConf() {
- return fsConf;
+ /**
+ * Revised constructor in which settings are gathered into a configuration
object.
+ *
+ * @param name name of the plugin
+ * @param config configuration options for this plugin which determine
+ * developer-defined runtime behavior
+ * @param context the global server-wide drillbit context
+ * @param storageConfig the configuration for the storage plugin that owns
this
+ * foramt plugin
Review comment:
```suggestion
* format plugin
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services