This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 4689e13  DRILL-7525: Convert SequenceFiles to EVF
4689e13 is described below

commit 4689e1343adb05562db9a16f3bf9e407d4e71794
Author: luoc <[email protected]>
AuthorDate: Sun Dec 13 18:12:53 2020 +0800

    DRILL-7525: Convert SequenceFiles to EVF
---
 .../easy/sequencefile/SequenceFileBatchReader.java | 186 +++++++++++++++++++++
 .../sequencefile/SequenceFileFormatConfig.java     |  29 ++--
 .../sequencefile/SequenceFileFormatPlugin.java     | 111 ++++++------
 .../sequencefile/SequenceFileRecordReader.java     | 181 --------------------
 .../store/sequencefile/TestSequenceFileReader.java | 104 ++++++++++--
 5 files changed, 336 insertions(+), 275 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
new file mode 100644
index 0000000..b5d80f4
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements 
ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+  private final SequenceFileFormatConfig config;
+  private final EasySubScan scan;
+  private FileSplit split;
+  private String queryUserName;
+  private String opUserName;
+  public static final String KEY_SCHEMA = "binary_key";
+  public static final String VALUE_SCHEMA = "binary_value";
+  private final BytesWritable key = new BytesWritable();
+  private final BytesWritable value = new BytesWritable();
+  private final int maxRecords;
+  private RowSetLoader loader;
+  private ScalarWriter keyWriter;
+  private ScalarWriter valueWriter;
+  private RecordReader<BytesWritable, BytesWritable> reader;
+  private CustomErrorContext errorContext;
+  private Stopwatch watch;
+
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+    this.maxRecords = scan.getMaxRecords();
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    builder.addNullable(KEY_SCHEMA, MinorType.VARBINARY);
+    builder.addNullable(VALUE_SCHEMA, MinorType.VARBINARY);
+    return builder.buildSchema();
+  }
+
+  private void processReader(FileSchemaNegotiator negotiator) throws 
ExecutionSetupException {
+    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
+    split = negotiator.split();
+    // After defined the split, We should also define the errorContext.
+    errorContext = negotiator.parentErrorContext();
+    opUserName = scan.getUserName();
+    queryUserName = 
negotiator.context().getFragmentContext().getQueryUserName();
+    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    jobConf.setInputFormat(inputFormat.getClass());
+    reader = getRecordReader(inputFormat, jobConf);
+  }
+
+  private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+    final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf 
jobConf)
+    throws ExecutionSetupException {
+    try {
+      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+      return ugi.doAs(new 
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+        @Override
+        public RecordReader<BytesWritable, BytesWritable> run() throws 
Exception {
+          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+        }
+      });
+    } catch (IOException | InterruptedException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d. "
+               + e.getMessage(), split.getPath(), split.getStart(), 
split.getLength())
+              .addContext(errorContext)
+              .build(logger);
+    }
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    negotiator.tableSchema(defineMetadata(), true);
+    logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), scan.getColumns());
+    // open Sequencefile
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failure in initial sequencefile reader. " + e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    ResultSetLoader setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(KEY_SCHEMA);
+    valueWriter = loader.scalar(VALUE_SCHEMA);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    int recordCount = 0;
+    if (watch == null) {
+      watch = Stopwatch.createStarted();
+    }
+    try {
+      while (!loader.isFull()) {
+        if (reader.next(key, value)) {
+          loader.start();
+          keyWriter.setBytes(key.getBytes(), key.getLength());
+          valueWriter.setBytes(value.getBytes(), value.getLength());
+          loader.save();
+          ++ recordCount;
+        } else {
+          logger.debug("Read {} records in {} ms", recordCount, 
watch.elapsed(TimeUnit.MILLISECONDS));
+          return false;
+        }
+        if (loader.limitReached(maxRecords)) {
+          return false;
+        }
+      }
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("An error occurred while reading the next key/value 
pair from the sequencefile reader. "
+               + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    try {
+      // The reader not support AutoCloseable, must be closed by invoke 
close().
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      throw UserException
+              .dataReadError(e)
+              .message("Failed closing sequencefile reader. " + e.getMessage())
+              .addContext(errorContext)
+              .build(logger);
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
index 0572ca7..3e9b570 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
@@ -17,29 +17,31 @@
  */
 package org.apache.drill.exec.store.easy.sequencefile;
 
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.FormatPluginConfig;
 
-import java.util.List;
-import java.util.Objects;
-
-@JsonTypeName("sequencefile") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+@JsonTypeName(SequenceFileFormatConfig.NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class SequenceFileFormatConfig implements FormatPluginConfig {
 
+  public static final String NAME = "sequencefile";
   private final List<String> extensions;
 
   @JsonCreator
-  public SequenceFileFormatConfig(
-      @JsonProperty("extensions") List<String> extensions) {
-    this.extensions = extensions == null ?
-        ImmutableList.of() : ImmutableList.copyOf(extensions);
+  public SequenceFileFormatConfig(@JsonProperty("extensions") List<String> 
extensions) {
+    this.extensions = extensions == null ? ImmutableList.of() : 
ImmutableList.copyOf(extensions);
   }
 
+  @JsonProperty("extensions")
   public List<String> getExtensions() {
     return extensions;
   }
@@ -63,8 +65,7 @@ public class SequenceFileFormatConfig implements 
FormatPluginConfig {
 
   @Override
   public String toString() {
-    return new PlanStringBuilder(this)
-        .field("extensions", extensions)
-        .toString();
+    return new PlanStringBuilder(this).field("extensions", 
extensions).toString();
   }
+
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index 9e55448..3c62fd5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -17,92 +17,77 @@
  */
 package org.apache.drill.exec.store.easy.sequencefile;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 
 public class SequenceFileFormatPlugin extends 
EasyFormatPlugin<SequenceFileFormatConfig> {
-  public SequenceFileFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf,
-                                  StoragePluginConfig storageConfig) {
-    this(name, context, fsConf, storageConfig, new 
SequenceFileFormatConfig(null));
-  }
 
-  public SequenceFileFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf,
-                                  StoragePluginConfig storageConfig, 
SequenceFileFormatConfig formatConfig) {
-    super(name, context, fsConf, storageConfig, formatConfig,
-      true, false, /* splittable = */ true, /* compressible = */ true,
-      formatConfig.getExtensions(), "sequencefile");
+  public SequenceFileFormatPlugin(String name,
+                                  DrillbitContext context,
+                                  Configuration fsConf,
+                                  StoragePluginConfig storageConfig,
+                                  SequenceFileFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
   }
 
-  @Override
-  public boolean supportsPushDown() {
-    return true;
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
SequenceFileFormatConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = true;
+    config.compressible = true;
+    config.extensions = pluginConfig.getExtensions();
+    config.fsConf = fsConf;
+    config.readerOperatorType = CoreOperatorType.SEQUENCE_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
+    config.supportsProjectPushdown = true;
+    config.defaultName = SequenceFileFormatConfig.NAME;
+    return config;
   }
 
-  @Override
-  public AbstractGroupScan getGroupScan(String userName, FileSelection 
selection, List<SchemaPath> columns)
-    throws IOException {
-    return new EasyGroupScan(userName, selection, this, columns, 
selection.selectionRoot, null);
-  }
+  private static class SequenceFileReaderFactory extends FileReaderFactory {
 
-  @Override
-  public boolean supportsStatistics() {
-    return false;
-  }
+    private final SequenceFileFormatConfig config;
+    private final EasySubScan scan;
 
-  @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) 
throws IOException {
-    throw new UnsupportedOperationException("unimplemented");
-  }
+    public SequenceFileReaderFactory(SequenceFileFormatConfig config, 
EasySubScan scan) {
+      this.config = config;
+      this.scan = scan;
+    }
 
-  @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path 
statsTablePath) throws IOException {
-    throw new UnsupportedOperationException("unimplemented");
-  }
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new SequenceFileBatchReader(config, scan);
+    }
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context,
-                                      DrillFileSystem dfs,
-                                      FileWork fileWork,
-                                      List<SchemaPath> columns,
-                                      String userName) throws 
ExecutionSetupException {
-    final Path path = dfs.makeQualified(fileWork.getPath());
-    final FileSplit split = new FileSplit(path, fileWork.getStart(), 
fileWork.getLength(), new String[]{""});
-    return new SequenceFileRecordReader(split, dfs, 
context.getQueryUserName(), userName);
   }
 
   @Override
-  public int getReaderOperatorType() {
-    return CoreOperatorType.SEQUENCE_SUB_SCAN_VALUE;
+  public ManagedReader<? extends FileSchemaNegotiator> 
newBatchReader(EasySubScan scan, OptionManager options)
+      throws ExecutionSetupException {
+    return new SequenceFileBatchReader(formatConfig, scan);
   }
 
   @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) throws IOException {
-    throw new UnsupportedOperationException();
-  }
+  protected FileScanBuilder frameworkBuilder(OptionManager options, 
EasySubScan scan) throws ExecutionSetupException {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new SequenceFileReaderFactory(formatConfig, 
scan));
 
-  @Override
-  public int getWriterOperatorType() {
-    throw new UnsupportedOperationException();
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
   }
 }
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
deleted file mode 100644
index 7f9b993..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.sequencefile;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-
-
-public class SequenceFileRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(SequenceFileRecordReader.class);
-
-  private static final int PER_BATCH_RECORD_COUNT = 4096;
-  private static final int PER_BATCH_BYTES = 256*1024;
-
-  private static final MajorType KEY_TYPE = 
Types.optional(TypeProtos.MinorType.VARBINARY);
-  private static final MajorType VALUE_TYPE = 
Types.optional(TypeProtos.MinorType.VARBINARY);
-
-  private final String keySchema = "binary_key";
-  private final String valueSchema = "binary_value";
-
-  private NullableVarBinaryVector keyVector;
-  private NullableVarBinaryVector valueVector;
-  private final FileSplit split;
-  private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> 
reader;
-  private final BytesWritable key = new BytesWritable();
-  private final BytesWritable value = new BytesWritable();
-  private final DrillFileSystem dfs;
-  private final String queryUserName;
-  private final String opUserName;
-
-  public SequenceFileRecordReader(final FileSplit split,
-                                  final DrillFileSystem dfs,
-                                  final String queryUserName,
-                                  final String opUserName) {
-    final List<SchemaPath> columns = new ArrayList<>();
-    columns.add(SchemaPath.getSimplePath(keySchema));
-    columns.add(SchemaPath.getSimplePath(valueSchema));
-    setColumns(columns);
-    this.dfs = dfs;
-    this.split = split;
-    this.queryUserName = queryUserName;
-    this.opUserName = opUserName;
-  }
-
-  @Override
-  protected boolean isSkipQuery() {
-    return false;
-  }
-
-  private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> 
getRecordReader(
-    final InputFormat<BytesWritable, BytesWritable> inputFormat,
-    final JobConf jobConf) throws ExecutionSetupException {
-    try {
-      final UserGroupInformation ugi = 
ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
-      return ugi.doAs(new 
PrivilegedExceptionAction<org.apache.hadoop.mapred.RecordReader<BytesWritable, 
BytesWritable>>() {
-        @Override
-        public org.apache.hadoop.mapred.RecordReader<BytesWritable, 
BytesWritable> run() throws Exception {
-          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
-        }
-      });
-    } catch (IOException | InterruptedException e) {
-      throw new ExecutionSetupException(
-        String.format("Error in creating sequencefile reader for file: %s, 
start: %d, length: %d",
-          split.getPath(), split.getStart(), split.getLength()), e);
-    }
-  }
-
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
-    final SequenceFileAsBinaryInputFormat inputFormat = new 
SequenceFileAsBinaryInputFormat();
-    final JobConf jobConf = new JobConf(dfs.getConf());
-    jobConf.setInputFormat(inputFormat.getClass());
-    reader = getRecordReader(inputFormat, jobConf);
-    final MaterializedField keyField = MaterializedField.create(keySchema, 
KEY_TYPE);
-    final MaterializedField valueField = MaterializedField.create(valueSchema, 
VALUE_TYPE);
-    try {
-      keyVector = output.addField(keyField, NullableVarBinaryVector.class);
-      valueVector = output.addField(valueField, NullableVarBinaryVector.class);
-    } catch (SchemaChangeException sce) {
-      throw new ExecutionSetupException("Error in setting up sequencefile 
reader.", sce);
-    }
-  }
-
-  @Override
-  public int next() {
-    final Stopwatch watch = Stopwatch.createStarted();
-    if (keyVector != null) {
-      keyVector.clear();
-      keyVector.allocateNew();
-    }
-    if (valueVector != null) {
-      valueVector.clear();
-      valueVector.allocateNew();
-    }
-    int recordCount = 0;
-    int batchSize = 0;
-    try {
-      while (recordCount < PER_BATCH_RECORD_COUNT && batchSize < 
PER_BATCH_BYTES && reader.next(key, value)) {
-        keyVector.getMutator().setSafe(recordCount, key.getBytes(), 0, 
key.getLength());
-        valueVector.getMutator().setSafe(recordCount, value.getBytes(), 0, 
value.getLength());
-        batchSize += (key.getLength() + value.getLength());
-        ++recordCount;
-      }
-      keyVector.getMutator().setValueCount(recordCount);
-      valueVector.getMutator().setValueCount(recordCount);
-      logger.debug("Read {} records in {} ms", recordCount, 
watch.elapsed(TimeUnit.MILLISECONDS));
-      return recordCount;
-    } catch (IOException ioe) {
-      close();
-      throw UserException.dataReadError(ioe).addContext("File Path", 
split.getPath().toString()).build(logger);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (IOException e) {
-      logger.warn("Exception closing reader: {}", e);
-    }
-  }
-
-  @Override
-  public String toString() {
-    long position = -1L;
-    try {
-      if (reader != null) {
-        position = reader.getPos();
-      }
-    } catch (IOException e) {
-      logger.trace("Unable to obtain reader position.", e);
-    }
-    return "SequenceFileRecordReader[File=" + split.getPath()
-        + ", Position=" + position
-        + "]";
-  }
-}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
index 7d23b1d..fd72e8e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
@@ -17,32 +17,102 @@
  */
 package org.apache.drill.exec.store.sequencefile;
 
-import java.io.DataOutputStream;
+import static org.junit.Assert.assertEquals;
+
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.file.Paths;
 
-import org.junit.Test;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.sequencefile.SequenceFileBatchReader;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.hadoop.io.BytesWritable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestSequenceFileReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+    dirTestWatcher.copyResourceToRoot(Paths.get("sequencefiles/"));
+  }
+
+  @Test
+  public void testStarQuery() throws Exception {
+    String sql = "select * from cp.`sequencefiles/simple.seq`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable(SequenceFileBatchReader.KEY_SCHEMA, MinorType.VARBINARY)
+        .addNullable(SequenceFileBatchReader.VALUE_SCHEMA, MinorType.VARBINARY)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("key1".getBytes(), "value1".getBytes())
+        .addRow("key2".getBytes(), "value2".getBytes())
+        .build();
+
+    assertEquals(2, sets.rowCount());
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "select convert_from(binary_key, 'UTF8') as binary_key from 
cp.`sequencefiles/simple.seq`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable(SequenceFileBatchReader.KEY_SCHEMA, MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow(byteWritableString("key0"))
+        .addRow(byteWritableString("key1"))
+        .build();
+
+    assertEquals(2, sets.rowCount());
+
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "select * from cp.`sequencefiles/simple.seq` limit 1 offset 
1";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
 
-public class TestSequenceFileReader extends BaseTestQuery {
+    assertEquals(1, sets.rowCount());
+    sets.clear();
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "select count(*) from cp.`sequencefiles/simple.seq`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+
+    assertEquals("Counts should match", 2, cnt);
+  }
 
-  public static String byteWritableString(String input) throws Exception {
+  private static String byteWritableString(String input) throws Exception {
     final ByteArrayOutputStream bout = new ByteArrayOutputStream();
     DataOutputStream out = new DataOutputStream(bout);
     final BytesWritable writable = new BytesWritable(input.getBytes("UTF-8"));
     writable.write(out);
     return new String(bout.toByteArray());
   }
-
-  @Test
-  public void testSequenceFileReader() throws Exception {
-    testBuilder()
-      .sqlQuery("select convert_from(t.binary_key, 'UTF8') as k, 
convert_from(t.binary_value, 'UTF8') as v " +
-        "from cp.`sequencefiles/simple.seq` t")
-      .ordered()
-      .baselineColumns("k", "v")
-      .baselineValues(byteWritableString("key0"), byteWritableString("value0"))
-      .baselineValues(byteWritableString("key1"), byteWritableString("value1"))
-      .build().run();
-  }
 }

Reply via email to