paul-rogers commented on a change in pull request #1962:
URL: https://github.com/apache/drill/pull/1962#discussion_r415183369



##########
File path: 
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
##########
@@ -15,78 +15,77 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.drill.exec.store.ltsv;
 
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.util.List;
 
+/**
+ * Enables Drill to query data in Labeled Tab Separated Values (LTSV) format.
+ * <a href="http://ltsv.org"; target="_blank">LTSV Spec</a>
+ */
 public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> 
{
 
-  private static final boolean IS_COMPRESSIBLE = true;
-
   private static final String DEFAULT_NAME = "ltsv";
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration 
fsConf, StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new 
LTSVFormatPluginConfig(null));
-  }
+  public static class LTSVReaderFactory extends FileReaderFactory {
+    private final LTSVFormatPluginConfig config;
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration 
fsConf, StoragePluginConfig config, LTSVFormatPluginConfig formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
-  }
+    public LTSVReaderFactory(LTSVFormatPluginConfig config) {
+      this.config = config;
+    }
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem 
dfs, FileWork fileWork, List<SchemaPath> columns, String userName) {
-    return new LTSVRecordReader(context, fileWork.getPath(), dfs, columns);
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new LTSVBatchReader();

Review comment:
       This class holds on to a `config`, but does not pass it to the reader. 
So, no need to hold onto the config.

##########
File path: 
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
##########
@@ -15,78 +15,77 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.drill.exec.store.ltsv;
 
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.util.List;
 
+/**
+ * Enables Drill to query data in Labeled Tab Separated Values (LTSV) format.
+ * <a href="http://ltsv.org"; target="_blank">LTSV Spec</a>
+ */
 public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> 
{
 
-  private static final boolean IS_COMPRESSIBLE = true;
-
   private static final String DEFAULT_NAME = "ltsv";
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration 
fsConf, StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new 
LTSVFormatPluginConfig(null));
-  }
+  public static class LTSVReaderFactory extends FileReaderFactory {
+    private final LTSVFormatPluginConfig config;
 
-  public LTSVFormatPlugin(String name, DrillbitContext context, Configuration 
fsConf, StoragePluginConfig config, LTSVFormatPluginConfig formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true, false, 
false, IS_COMPRESSIBLE, formatPluginConfig.getExtensions(), DEFAULT_NAME);
-  }
+    public LTSVReaderFactory(LTSVFormatPluginConfig config) {
+      this.config = config;
+    }
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem 
dfs, FileWork fileWork, List<SchemaPath> columns, String userName) {
-    return new LTSVRecordReader(context, fileWork.getPath(), dfs, columns);
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new LTSVBatchReader();
+    }
   }
 
-
-  @Override
-  public int getReaderOperatorType() {
-    return UserBitShared.CoreOperatorType.LTSV_SUB_SCAN_VALUE;
+  public LTSVFormatPlugin(String name, DrillbitContext context,
+                         Configuration fsConf, StoragePluginConfig 
storageConfig,
+                          LTSVFormatPluginConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
   }
 
-  @Override
-  public int getWriterOperatorType() {
-    throw new UnsupportedOperationException("Drill doesn't currently support 
writing to LTSV files.");
-  }
-
-  @Override
-  public boolean supportsPushDown() {
-    return true;
-  }
-
-  @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) {
-    throw new UnsupportedOperationException("Drill doesn't currently support 
writing to LTSV files.");
-  }
-
-  @Override
-  public boolean supportsStatistics() {
-    return false;
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
LTSVFormatPluginConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = true;

Review comment:
       Original code had the `blockSplittable` parameter as `false`. Which is 
right?

##########
File path: 
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordIterator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.easy.EasyEVFIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+
+public class LTSVRecordIterator implements EasyEVFIterator {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LTSVRecordIterator.class);
+
+  private final RowSetLoader rowWriter;
+
+  private final BufferedReader reader;
+
+  private String line;
+
+  private int recordCount;
+
+  public LTSVRecordIterator(RowSetLoader rowWriter, BufferedReader reader) {
+    this.rowWriter = rowWriter;
+    this.reader = reader;
+  }
+
+  public boolean nextRow() {
+    // Get the line
+    try {
+      line = reader.readLine();
+
+      // Increment record counter
+      recordCount++;
+
+      if (line == null) {
+        return false;
+      } else if (line.trim().length() == 0) {
+        // Skip empty lines
+        return true;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Error reading LTSV Data: %s", e.getMessage())
+        .addContext("Line %d: %s", recordCount, line)
+        .build(logger);
+    }
+
+    // Process the row

Review comment:
       Nit: Not a super-useful comment; the method name conveys the same info.

##########
File path: 
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordIterator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.easy.EasyEVFIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+
+public class LTSVRecordIterator implements EasyEVFIterator {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LTSVRecordIterator.class);
+
+  private final RowSetLoader rowWriter;
+
+  private final BufferedReader reader;
+
+  private String line;
+
+  private int recordCount;
+
+  public LTSVRecordIterator(RowSetLoader rowWriter, BufferedReader reader) {
+    this.rowWriter = rowWriter;
+    this.reader = reader;
+  }
+
+  public boolean nextRow() {
+    // Get the line
+    try {
+      line = reader.readLine();
+
+      // Increment record counter
+      recordCount++;
+
+      if (line == null) {

Review comment:
       Did you want to count the `recordCount` even if there was no record 
returned? An empty file has a record count of 1 rather than 0?

##########
File path: 
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordIterator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.easy.EasyEVFIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+
+public class LTSVRecordIterator implements EasyEVFIterator {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LTSVRecordIterator.class);
+
+  private final RowSetLoader rowWriter;
+
+  private final BufferedReader reader;
+
+  private String line;
+
+  private int recordCount;
+
+  public LTSVRecordIterator(RowSetLoader rowWriter, BufferedReader reader) {
+    this.rowWriter = rowWriter;
+    this.reader = reader;
+  }
+
+  public boolean nextRow() {
+    // Get the line
+    try {
+      line = reader.readLine();
+
+      // Increment record counter
+      recordCount++;
+
+      if (line == null) {
+        return false;
+      } else if (line.trim().length() == 0) {
+        // Skip empty lines
+        return true;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Error reading LTSV Data: %s", e.getMessage())
+        .addContext("Line %d: %s", recordCount, line)
+        .build(logger);
+    }
+
+    // Process the row
+    processRow();
+
+    return true;
+  }
+
+  /**
+   * Function processes one row of data, splitting it up first by tabs then 
splitting the key/value pairs
+   * finally recording it in the current Drill row.
+   */
+  private void processRow() {
+
+    for (String field : line.split("\t")) {
+      int index = field.indexOf(":");
+      if (index <= 0) {
+        throw UserException
+          .dataReadError()
+          .message("Invalid LTSV format at line %d: %s", recordCount, line)

Review comment:
       Would be useful to pass along the error context from the schema 
negotiator so you get info about the plugin, file name, etc.

##########
File path: 
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordIterator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.easy.EasyEVFIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+
+public class LTSVRecordIterator implements EasyEVFIterator {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LTSVRecordIterator.class);
+
+  private final RowSetLoader rowWriter;
+
+  private final BufferedReader reader;
+
+  private String line;
+
+  private int recordCount;
+
+  public LTSVRecordIterator(RowSetLoader rowWriter, BufferedReader reader) {
+    this.rowWriter = rowWriter;
+    this.reader = reader;
+  }
+
+  public boolean nextRow() {
+    // Get the line
+    try {
+      line = reader.readLine();
+
+      // Increment record counter
+      recordCount++;
+
+      if (line == null) {
+        return false;
+      } else if (line.trim().length() == 0) {
+        // Skip empty lines
+        return true;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Error reading LTSV Data: %s", e.getMessage())
+        .addContext("Line %d: %s", recordCount, line)
+        .build(logger);
+    }
+
+    // Process the row
+    processRow();
+
+    return true;
+  }
+
+  /**
+   * Function processes one row of data, splitting it up first by tabs then 
splitting the key/value pairs
+   * finally recording it in the current Drill row.
+   */
+  private void processRow() {
+
+    for (String field : line.split("\t")) {
+      int index = field.indexOf(":");

Review comment:
       If using `String.split()` for the whole line, might as well use it per 
field. However, this approach is likely slower than doing a parse because of 
the extra copying and intermediate objects.

##########
File path: 
contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordIterator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.ltsv;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.easy.EasyEVFIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+
+public class LTSVRecordIterator implements EasyEVFIterator {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LTSVRecordIterator.class);
+
+  private final RowSetLoader rowWriter;
+
+  private final BufferedReader reader;
+
+  private String line;
+
+  private int recordCount;
+
+  public LTSVRecordIterator(RowSetLoader rowWriter, BufferedReader reader) {
+    this.rowWriter = rowWriter;
+    this.reader = reader;
+  }
+
+  public boolean nextRow() {
+    // Get the line
+    try {
+      line = reader.readLine();
+
+      // Increment record counter
+      recordCount++;
+
+      if (line == null) {
+        return false;
+      } else if (line.trim().length() == 0) {
+        // Skip empty lines
+        return true;
+      }
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Error reading LTSV Data: %s", e.getMessage())
+        .addContext("Line %d: %s", recordCount, line)
+        .build(logger);
+    }
+
+    // Process the row
+    processRow();
+
+    return true;
+  }
+
+  /**
+   * Function processes one row of data, splitting it up first by tabs then 
splitting the key/value pairs
+   * finally recording it in the current Drill row.
+   */
+  private void processRow() {
+
+    for (String field : line.split("\t")) {
+      int index = field.indexOf(":");
+      if (index <= 0) {
+        throw UserException
+          .dataReadError()
+          .message("Invalid LTSV format at line %d: %s", recordCount, line)
+          .build(logger);
+      }
+
+      String fieldName = field.substring(0, index);
+      String fieldValue = field.substring(index + 1);
+
+      LTSVBatchReader.writeStringColumn(rowWriter, fieldName, fieldValue);

Review comment:
       Seems odd to do this as a static method in the batch reader. Maybe just 
move it here if this is the only use.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/EasyEVFBatchReader.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.easy;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+public abstract class EasyEVFBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EasyEVFBatchReader.class);

Review comment:
       The idea is good; we may have to refine the implementation. My thought 
is to move the open logic into the `SchemaNegotiator` to keep readers simple 
(no complex base class). There are a series of PRs which are incorporating the 
provided schema into the base mechanism.
   
   The new PRs redesign the readers so that the logic now done in `open()` will 
be done in the constructor so more fields can be `final`.
   
   To avoid slowing progress, I'm OK with duplicating that work here, but if we 
do, I'll eventually remove this code when the new version replaces it.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/EasyEVFBatchReader.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.easy;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
+
+/**
+ * To create a format plugin, there is often a great deal of cut/pasted code. 
The EasyEVFBatchReader
+ * is intended to allow developers of new format plugins to focus their energy 
on code that is truly unique
+ * for each format.
+ * <p>
+ * To create a new format plugin, simply extend this class, and overwrite the 
open() method as shown below in the
+ * snippet below. The code that is unique for the formats will be contained in 
the iterator class which
+ * <p>
+ * With respect to schema creation, there are three basic situations:
+ * <ol>
+ *   <li>Schema is known before opening the file</li>
+ *   <li>Schema is known (and unchanging) after reading the first row of 
data</li>
+ *   <li>Schmea is completely flexible, IE: not consistent and not known after 
first row.</li>
+ * </ol>
+ *
+ * This class inmplements a series of methods to facilitate schema creation. 
However, to achieve the best
+ * possible performance, it is vital to use the correct methods to map the 
schema. Drill will perform fastest when
+ * the schema is known in advance and the writers can be stored in a data 
structure which minimizes the amount of string
+ * comparisons.
+ *
+ * <b>Current state</b>
+ * For the third scenario, where the schema is not consistent, there are a 
collection of functions, all named <pre>writeDataType()</pre>
+ * which accept a ScalarWriter, column name and value. These functions first 
check to see whether the column has been added to the schema, if not
+ * it adds it. If it has been added, the value will be added to the column.
+ *
+ * <p>
+ *
+ *   <pre>
+ *   public boolean open(FileSchemaNegotiator negotiator) {
+ *     super.open(negotiator);
+ *     super.fileIterator = new LTSVRecordIterator(getRowWriter(), reader);
+ *     return true;
+ *   }
+ *   </pre>
+ *
+ */
+public abstract class EasyEVFBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EasyEVFBatchReader.class);
+
+  public FileSplit split;
+
+  public EasyEVFIterator fileIterator;
+
+  public ResultSetLoader loader;
+
+  private RowSetLoader rowWriter;
+
+  public InputStream fsStream;
+
+  public BufferedReader reader;

Review comment:
       The buffered reader also assumes a particular way of parsing. Some 
parsers have their own internal mechanisms (such as JSON), so this field would 
also go unused. Further, the parser really should close the file, not this 
class; so we'd end up with confusion about which level should do the close.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/EasyEVFBatchReader.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.easy;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
+
+/**
+ * To create a format plugin, there is often a great deal of cut/pasted code. 
The EasyEVFBatchReader
+ * is intended to allow developers of new format plugins to focus their energy 
on code that is truly unique
+ * for each format.
+ * <p>
+ * To create a new format plugin, simply extend this class, and overwrite the 
open() method as shown below in the
+ * snippet below. The code that is unique for the formats will be contained in 
the iterator class which
+ * <p>
+ * With respect to schema creation, there are three basic situations:
+ * <ol>
+ *   <li>Schema is known before opening the file</li>
+ *   <li>Schema is known (and unchanging) after reading the first row of 
data</li>
+ *   <li>Schmea is completely flexible, IE: not consistent and not known after 
first row.</li>
+ * </ol>
+ *
+ * This class inmplements a series of methods to facilitate schema creation. 
However, to achieve the best
+ * possible performance, it is vital to use the correct methods to map the 
schema. Drill will perform fastest when
+ * the schema is known in advance and the writers can be stored in a data 
structure which minimizes the amount of string
+ * comparisons.
+ *
+ * <b>Current state</b>
+ * For the third scenario, where the schema is not consistent, there are a 
collection of functions, all named <pre>writeDataType()</pre>
+ * which accept a ScalarWriter, column name and value. These functions first 
check to see whether the column has been added to the schema, if not
+ * it adds it. If it has been added, the value will be added to the column.
+ *
+ * <p>
+ *
+ *   <pre>
+ *   public boolean open(FileSchemaNegotiator negotiator) {
+ *     super.open(negotiator);
+ *     super.fileIterator = new LTSVRecordIterator(getRowWriter(), reader);
+ *     return true;
+ *   }
+ *   </pre>
+ *
+ */
+public abstract class EasyEVFBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EasyEVFBatchReader.class);
+
+  public FileSplit split;
+
+  public EasyEVFIterator fileIterator;

Review comment:
       The file iterator really only applies to the limited set of record-based 
files. For things like the JSON reader we'd end up ignoring this iterator which 
might make things a bit more complex.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/EasyEVFBatchReader.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.easy;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
+
+/**
+ * To create a format plugin, there is often a great deal of cut/pasted code. 
The EasyEVFBatchReader
+ * is intended to allow developers of new format plugins to focus their energy 
on code that is truly unique
+ * for each format.
+ * <p>
+ * To create a new format plugin, simply extend this class, and overwrite the 
open() method as shown below in the
+ * snippet below. The code that is unique for the formats will be contained in 
the iterator class which
+ * <p>
+ * With respect to schema creation, there are three basic situations:
+ * <ol>
+ *   <li>Schema is known before opening the file</li>
+ *   <li>Schema is known (and unchanging) after reading the first row of 
data</li>
+ *   <li>Schmea is completely flexible, IE: not consistent and not known after 
first row.</li>
+ * </ol>
+ *
+ * This class inmplements a series of methods to facilitate schema creation. 
However, to achieve the best
+ * possible performance, it is vital to use the correct methods to map the 
schema. Drill will perform fastest when
+ * the schema is known in advance and the writers can be stored in a data 
structure which minimizes the amount of string
+ * comparisons.
+ *
+ * <b>Current state</b>
+ * For the third scenario, where the schema is not consistent, there are a 
collection of functions, all named <pre>writeDataType()</pre>
+ * which accept a ScalarWriter, column name and value. These functions first 
check to see whether the column has been added to the schema, if not
+ * it adds it. If it has been added, the value will be added to the column.
+ *
+ * <p>
+ *
+ *   <pre>
+ *   public boolean open(FileSchemaNegotiator negotiator) {
+ *     super.open(negotiator);
+ *     super.fileIterator = new LTSVRecordIterator(getRowWriter(), reader);
+ *     return true;
+ *   }
+ *   </pre>
+ *
+ */
+public abstract class EasyEVFBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EasyEVFBatchReader.class);
+
+  public FileSplit split;
+
+  public EasyEVFIterator fileIterator;
+
+  public ResultSetLoader loader;
+
+  private RowSetLoader rowWriter;
+
+  public InputStream fsStream;
+
+  public BufferedReader reader;
+
+  public RowSetLoader getRowWriter() {
+    return rowWriter;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    this.split = negotiator.split();
+    this.loader = negotiator.build();
+    this.rowWriter = loader.writer();
+    try {
+      this.fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      this.reader = new BufferedReader(new InputStreamReader(fsStream, 
StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message(String.format("Failed to open input file: %s", 
split.getPath()))
+        .addContext(e.getMessage())
+        .build(logger);
+    }

Review comment:
       The above boilerplate should be abstracted out. I'm thinking the 
`SchemaNegotiator` would be a good place.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/EasyEVFBatchReader.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.easy;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
+
+/**
+ * To create a format plugin, there is often a great deal of cut/pasted code. 
The EasyEVFBatchReader
+ * is intended to allow developers of new format plugins to focus their energy 
on code that is truly unique
+ * for each format.
+ * <p>
+ * To create a new format plugin, simply extend this class, and overwrite the 
open() method as shown below in the
+ * snippet below. The code that is unique for the formats will be contained in 
the iterator class which
+ * <p>
+ * With respect to schema creation, there are three basic situations:
+ * <ol>
+ *   <li>Schema is known before opening the file</li>
+ *   <li>Schema is known (and unchanging) after reading the first row of 
data</li>
+ *   <li>Schmea is completely flexible, IE: not consistent and not known after 
first row.</li>
+ * </ol>
+ *
+ * This class inmplements a series of methods to facilitate schema creation. 
However, to achieve the best
+ * possible performance, it is vital to use the correct methods to map the 
schema. Drill will perform fastest when
+ * the schema is known in advance and the writers can be stored in a data 
structure which minimizes the amount of string
+ * comparisons.
+ *
+ * <b>Current state</b>
+ * For the third scenario, where the schema is not consistent, there are a 
collection of functions, all named <pre>writeDataType()</pre>
+ * which accept a ScalarWriter, column name and value. These functions first 
check to see whether the column has been added to the schema, if not
+ * it adds it. If it has been added, the value will be added to the column.
+ *
+ * <p>
+ *
+ *   <pre>
+ *   public boolean open(FileSchemaNegotiator negotiator) {
+ *     super.open(negotiator);
+ *     super.fileIterator = new LTSVRecordIterator(getRowWriter(), reader);
+ *     return true;
+ *   }
+ *   </pre>
+ *
+ */
+public abstract class EasyEVFBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EasyEVFBatchReader.class);
+
+  public FileSplit split;
+
+  public EasyEVFIterator fileIterator;
+
+  public ResultSetLoader loader;
+
+  private RowSetLoader rowWriter;
+
+  public InputStream fsStream;
+
+  public BufferedReader reader;
+
+  public RowSetLoader getRowWriter() {
+    return rowWriter;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    this.split = negotiator.split();
+    this.loader = negotiator.build();
+    this.rowWriter = loader.writer();
+    try {
+      this.fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      this.reader = new BufferedReader(new InputStreamReader(fsStream, 
StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message(String.format("Failed to open input file: %s", 
split.getPath()))
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (rowWriter.start()) {
+      if (!fileIterator.nextRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      AutoCloseables.close(reader, fsStream);
+    } catch (Exception e) {
+      logger.warn("Error closing Input Streams.");
+    }
+  }
+
+  /**
+   * Writes a string column.  This function should be used when the schema is 
not known in advance or cannot be inferred from the first
+   * row of data.
+   * @param rowWriter The rowWriter object
+   * @param fieldName The name of the field to be written
+   * @param value The value to be written
+   */
+  public static void writeStringColumn(TupleWriter rowWriter, String 
fieldName, String value) {

Review comment:
       These methods are handy, but they encourage quick-and-dirty (but slow) 
approaches to reading records. In general, it is best to use indexed mappings 
from input fields to column writers to avoid a per-value hash.
   
   When I thought about this issue, I realized that most file formats have 
their own native data types, so conversion is needed from those types to a Java 
type before we could call the column writer methods. This means we need 
column-specific "converters". If we have those, then those converters are the 
right place to hold the corresponding column writer. Else, we could end up with 
*two* has lookups per value, which would be horrible for performance.
   
   That is, the design we want is:
   
   * For column `i`, obtain the converter for `i`.
   * The converter reads data in a source-specific format.
   * The converter converts that source-specific format to a Java type.
   * The converter holds onto a column writer and writes the Java data to the 
column writer.
   
   This means each plugin will have its own way to map from source columns to 
converters: by name (nested, if the source supports maps), or (ideally) by 
position.
   
   Also, recall that Drill handles arrays and maps which are hard to handle 
with this approach unless we first materialize the data into a Java array or 
map, which would be costly.
   
   So, great idea, but the problem is a bit more complex than this approach 
suggests.
   
   All that said, it would be fine to create a class for the very narrow use 
case of sources that need no conversion and only work with simple scalar values.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/EasyEVFBatchReader.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.easy;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.joda.time.LocalDate;
+import org.joda.time.Period;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+
+
+/**
+ * To create a format plugin, there is often a great deal of cut/pasted code. 
The EasyEVFBatchReader
+ * is intended to allow developers of new format plugins to focus their energy 
on code that is truly unique
+ * for each format.
+ * <p>
+ * To create a new format plugin, simply extend this class, and overwrite the 
open() method as shown below in the
+ * snippet below. The code that is unique for the formats will be contained in 
the iterator class which
+ * <p>
+ * With respect to schema creation, there are three basic situations:
+ * <ol>
+ *   <li>Schema is known before opening the file</li>
+ *   <li>Schema is known (and unchanging) after reading the first row of 
data</li>
+ *   <li>Schmea is completely flexible, IE: not consistent and not known after 
first row.</li>
+ * </ol>
+ *
+ * This class inmplements a series of methods to facilitate schema creation. 
However, to achieve the best
+ * possible performance, it is vital to use the correct methods to map the 
schema. Drill will perform fastest when
+ * the schema is known in advance and the writers can be stored in a data 
structure which minimizes the amount of string
+ * comparisons.
+ *
+ * <b>Current state</b>
+ * For the third scenario, where the schema is not consistent, there are a 
collection of functions, all named <pre>writeDataType()</pre>
+ * which accept a ScalarWriter, column name and value. These functions first 
check to see whether the column has been added to the schema, if not
+ * it adds it. If it has been added, the value will be added to the column.
+ *
+ * <p>
+ *
+ *   <pre>
+ *   public boolean open(FileSchemaNegotiator negotiator) {
+ *     super.open(negotiator);
+ *     super.fileIterator = new LTSVRecordIterator(getRowWriter(), reader);
+ *     return true;
+ *   }
+ *   </pre>
+ *
+ */
+public abstract class EasyEVFBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EasyEVFBatchReader.class);
+
+  public FileSplit split;
+
+  public EasyEVFIterator fileIterator;
+
+  public ResultSetLoader loader;
+
+  private RowSetLoader rowWriter;
+
+  public InputStream fsStream;
+
+  public BufferedReader reader;
+
+  public RowSetLoader getRowWriter() {
+    return rowWriter;
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    this.split = negotiator.split();
+    this.loader = negotiator.build();
+    this.rowWriter = loader.writer();
+    try {
+      this.fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+      this.reader = new BufferedReader(new InputStreamReader(fsStream, 
StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message(String.format("Failed to open input file: %s", 
split.getPath()))
+        .addContext(e.getMessage())
+        .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (rowWriter.start()) {
+      if (!fileIterator.nextRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      AutoCloseables.close(reader, fsStream);

Review comment:
       Generally, we want to close a stream once. If the reader is built from 
the stream, it encapsulates, and will close, the stream. If the reader is not 
used, then the stream has to be closed. Unless the underlying parser closes it 
in which case we don't need to close it. So, not sure this is actually 
simplifying things, just raising messy questions.

##########
File path: exec/java-exec/src/test/java/org/apache/drill/test/ClusterTest.java
##########
@@ -125,4 +139,30 @@ public static void run(String query, Object... args) 
throws Exception {
   public QueryBuilder queryBuilder( ) {
     return client.queryBuilder();
   }
+
+  /**
+   * Generates a compressed version of the file for testing
+   * @param fileName Name of the input file
+   * @param codecName The desired CODEC to be used.
+   * @param outFileName Name of generated compressed file
+   * @throws IOException If function cannot generate file, throws IOException
+   */
+  public void generateCompressedFile(String fileName, String codecName, String 
outFileName) throws IOException {

Review comment:
       I like the idea of moving this. But, `ClusterTest` is perhaps the wrong 
place. Maybe create a utility class in the same package. (That way, it can also 
be use by `BaseTestQuery` and `OperatorFixture` based tests.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to