cgivre commented on a change in pull request #2125:
URL: https://github.com/apache/drill/pull/2125#discussion_r541939346



##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";

Review comment:
       Please use `ALL_CAPS` for constant names. 

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+

Review comment:
       I'd add a variable for the limit pushdown. 
   So... here add
   ```java
   private final int maxRecords;
   ```
   
   Then in the constructor, add:
   ```java
   maxRecords = scan.getMaxRecords();
   ```

##########
File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
##########
@@ -17,32 +17,59 @@
  */
 package org.apache.drill.exec.store.sequencefile;
 
-import java.io.DataOutputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.file.Paths;
 
-import org.junit.Test;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryRowSetIterator;
 import org.apache.hadoop.io.BytesWritable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-public class TestSequenceFileReader extends BaseTestQuery {
+@Category(RowSetTests.class)
+public class TestSequenceFileReader extends ClusterTest {
 
-  public static String byteWritableString(String input) throws Exception {
-    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bout);
-    final BytesWritable writable = new BytesWritable(input.getBytes("UTF-8"));
-    writable.write(out);
-    return new String(bout.toByteArray());
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+    dirTestWatcher.copyResourceToRoot(Paths.get("sequencefiles/"));
   }
 
   @Test
   public void testSequenceFileReader() throws Exception {
     testBuilder()
-      .sqlQuery("select convert_from(t.binary_key, 'UTF8') as k, 
convert_from(t.binary_value, 'UTF8') as v " +
-        "from cp.`sequencefiles/simple.seq` t")
+      .sqlQuery("select convert_from(t.binary_key, 'UTF8') as k, 
convert_from(t.binary_value, 'UTF8') as v "
+        + "from cp.`sequencefiles/simple.seq` t")
       .ordered()
       .baselineColumns("k", "v")
       .baselineValues(byteWritableString("key0"), byteWritableString("value0"))
       .baselineValues(byteWritableString("key1"), byteWritableString("value1"))
-      .build().run();
+      .build()
+      .run();
+  }
+
+  @Test
+  public void testOutput() {
+    String sql = "select convert_from(t.binary_key, 'UTF8'), 
convert_from(t.binary_value, 'UTF8')"
+        + "from cp.`sequencefiles/simple.seq` t";
+    QueryRowSetIterator iterator = queryBuilder().sql(sql).rowSetIterator();
+    for (RowSet rowset : iterator) {
+      rowset.print();
+      rowset.clear();
+    }
+  }
+
+  private static String byteWritableString(String input) throws Exception {
+    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bout);
+    final BytesWritable writable = new BytesWritable(input.getBytes("UTF-8"));
+    writable.write(out);
+    return new String(bout.toByteArray());
   }
+

Review comment:
       I'd recommend some additional work on the unit tests.  First, please 
remove any code in the unit tests that produces output to STDOUT.  (IE any 
print statements) 
   
   Secondly, take a look at the test below:  I'd recommend refactoring all the 
unit tests for this in the pattern below. This pattern will test that the 
schema is being generated correctly, and that the outputs are properly mapped 
using EVF.
   
   I'd also add unit tests for:
   * Star Query
   * Explicit Field Query
   * Limit Pushdown (Plan only)
   * SerDe 
   * Compressed File (If applicable)
   
   You can pretty much cut/paste from the file below for all those tests. 
   
   
   
https://github.com/apache/drill/blob/f79587ed14767463129f8905d2e36e55563655f2/contrib/format-spss/src/test/java/org/apache/drill/exec/store/spss/TestSpssReader.java#L110-L131

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();

Review comment:
       After you've defined the `split`, you should also define the 
`errorContext`.
   ```java
   errorContext = negotiator.parentErrorContext();
   ```
   Once you've done that, you can include the `errorContext` in all exceptions 
which gives the user a lot more information about the exception. 
   

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;

Review comment:
       These two vars do not appear to be used.  Please remove if unnecessary.  
Specifically `opUserName` and `queryUserName`.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(

Review comment:
       Please consider throwing a `UserException` here (and elsewhere).

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);

Review comment:
       You should set the `isComplete` parameter here to `true` because you are 
not adding columns to the schema.  

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())

Review comment:
       You can remove the call to `e.getMessage()` here as it will be 
overwritten in the following `addContext()` call.  Also, I'd suggest moving the 
`e.getMessage()` to the actual `message()` call.  The file information will be 
provided via the `errorContext` so you don't need to include that in the 
message.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {

Review comment:
       I strongly recommend adding the limit pushdown code.   All you need to 
do is add the code I referred to earlier and then in this loop add the 
following:
   ```java
   if (loader.limitReached(maxRecords) ) {
     return false;
   }

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();

Review comment:
       I think `setLoader` can be local. 

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, 
watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)

Review comment:
       Please make sure you add a message as to what went wrong here.    You'll 
want to make sure every error message has:
   * a clear message in the `message()` call and
   * a call to the `addContext(errorContext)`
   
   Here and elsewhere.
   

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, 
watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)
+              .addContext("File Path", split.getPath())
+              .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      logger.warn("Exception closing reader: {}", e);
+    }
+  }
+
+  @Override
+  public String toString() {

Review comment:
       The `toString()` function is not really needed for the BatchReader 
classes. 

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, 
watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();

Review comment:
       The `close()` call is not needed here.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, 
watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)
+              .addContext("File Path", split.getPath())
+              .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {

Review comment:
       Does the reader implement `AutoCloseable`?  If so you might want to use 
that instead.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
##########
@@ -17,92 +17,77 @@
  */
 package org.apache.drill.exec.store.easy.sequencefile;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 
 public class SequenceFileFormatPlugin extends 
EasyFormatPlugin<SequenceFileFormatConfig> {
-  public SequenceFileFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf,
-                                  StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new 
SequenceFileFormatConfig(null));
-  }
 
-  public SequenceFileFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf,
-                                  StoragePluginConfig storageConfig, 
SequenceFileFormatConfig formatConfig) {
-    super(name, context, fsConf, storageConfig, formatConfig,
-      true, false, /* splittable = */ true, /* compressible = */ true,
-      formatConfig.getExtensions(), "sequencefile");
+  public SequenceFileFormatPlugin(String name,
+                                  DrillbitContext context,
+                                  Configuration fsConf,
+                                  StoragePluginConfig storageConfig,
+                                  SequenceFileFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
   }
 
-  @Override
-  public boolean supportsPushDown() {
-    return true;
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
SequenceFileFormatConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = true;
+    config.compressible = true;
+    config.extensions = pluginConfig.getExtensions();
+    config.fsConf = fsConf;
+    config.readerOperatorType = CoreOperatorType.SEQUENCE_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    config.supportsProjectPushdown = true;
+    config.defaultName = SequenceFileFormatConfig.NAME;

Review comment:
       If you want to support the limit pushdown, you should add
   ```
   config.supportsLimitPushdown = true;
   ```
   to this method.

##########
File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  private final String keySchema = "binary_key";
+  private final String valueSchema = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private ResultSetLoader setLoader;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(keySchema, MinorType.VARBINARY);
+    builder.addNullable(valueSchema, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw new ExecutionSetupException(
+        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
+          split.getPath(), split.getStart(), split.getLength()), e);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), false);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Sequencefile %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(keySchema);
+    valueWriter = loader.scalar(valueSchema);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, 
watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      close();
+      throw UserException
+              .dataReadError(e)
+              .addContext("File Path", split.getPath())
+              .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      logger.warn("Exception closing reader: {}", e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    long position = -1L;
+    try {
+      if (reader != null) {
+        position = reader.getPos();
+      }
+    } catch (IOException e) {
+      logger.trace("Unable to obtain reader position.", e);
+    }
+    return String.format("SequenceFileBatchReader[File=%s, Position=%s]", 
split.getPath(), position);
+  }
+

Review comment:
       NIT:  Remove extra line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to