This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 4689e13 DRILL-7525: Convert SequenceFiles to EVF
4689e13 is described below
commit 4689e1343adb05562db9a16f3bf9e407d4e71794
Author: luoc <[email protected]>
AuthorDate: Sun Dec 13 18:12:53 2020 +0800
DRILL-7525: Convert SequenceFiles to EVF
---
.../easy/sequencefile/SequenceFileBatchReader.java | 186 +++++++++++++++++++++
.../sequencefile/SequenceFileFormatConfig.java | 29 ++--
.../sequencefile/SequenceFileFormatPlugin.java | 111 ++++++------
.../sequencefile/SequenceFileRecordReader.java | 181 --------------------
.../store/sequencefile/TestSequenceFileReader.java | 104 ++++++++++--
5 files changed, 336 insertions(+), 275 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
new file mode 100644
index 0000000..b5d80f4
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.sequencefile;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileBatchReader implements
ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SequenceFileBatchReader.class);
+
+ private final SequenceFileFormatConfig config;
+ private final EasySubScan scan;
+ private FileSplit split;
+ private String queryUserName;
+ private String opUserName;
+ public static final String KEY_SCHEMA = "binary_key";
+ public static final String VALUE_SCHEMA = "binary_value";
+ private final BytesWritable key = new BytesWritable();
+ private final BytesWritable value = new BytesWritable();
+ private final int maxRecords;
+ private RowSetLoader loader;
+ private ScalarWriter keyWriter;
+ private ScalarWriter valueWriter;
+ private RecordReader<BytesWritable, BytesWritable> reader;
+ private CustomErrorContext errorContext;
+ private Stopwatch watch;
+
+ public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan
scan) {
+ this.config = config;
+ this.scan = scan;
+ this.maxRecords = scan.getMaxRecords();
+ }
+
+ private TupleMetadata defineMetadata() {
+ SchemaBuilder builder = new SchemaBuilder();
+ builder.addNullable(KEY_SCHEMA, MinorType.VARBINARY);
+ builder.addNullable(VALUE_SCHEMA, MinorType.VARBINARY);
+ return builder.buildSchema();
+ }
+
+ private void processReader(FileSchemaNegotiator negotiator) throws
ExecutionSetupException {
+ final SequenceFileAsBinaryInputFormat inputFormat = new
SequenceFileAsBinaryInputFormat();
+ split = negotiator.split();
+ // After defined the split, We should also define the errorContext.
+ errorContext = negotiator.parentErrorContext();
+ opUserName = scan.getUserName();
+ queryUserName =
negotiator.context().getFragmentContext().getQueryUserName();
+ final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+ jobConf.setInputFormat(inputFormat.getClass());
+ reader = getRecordReader(inputFormat, jobConf);
+ }
+
+ private RecordReader<BytesWritable, BytesWritable> getRecordReader(
+ final InputFormat<BytesWritable, BytesWritable> inputFormat, final JobConf
jobConf)
+ throws ExecutionSetupException {
+ try {
+ final UserGroupInformation ugi =
ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+ return ugi.doAs(new
PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
+ @Override
+ public RecordReader<BytesWritable, BytesWritable> run() throws
Exception {
+ return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+ }
+ });
+ } catch (IOException | InterruptedException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Error in creating sequencefile reader for file: %s,
start: %d, length: %d. "
+ + e.getMessage(), split.getPath(), split.getStart(),
split.getLength())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ negotiator.tableSchema(defineMetadata(), true);
+ logger.debug("The config is {}, root is {}, columns has {}", config,
scan.getSelectionRoot(), scan.getColumns());
+ // open Sequencefile
+ try {
+ processReader(negotiator);
+ } catch (ExecutionSetupException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failure in initial sequencefile reader. " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ ResultSetLoader setLoader = negotiator.build();
+ loader = setLoader.writer();
+ keyWriter = loader.scalar(KEY_SCHEMA);
+ valueWriter = loader.scalar(VALUE_SCHEMA);
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ int recordCount = 0;
+ if (watch == null) {
+ watch = Stopwatch.createStarted();
+ }
+ try {
+ while (!loader.isFull()) {
+ if (reader.next(key, value)) {
+ loader.start();
+ keyWriter.setBytes(key.getBytes(), key.getLength());
+ valueWriter.setBytes(value.getBytes(), value.getLength());
+ loader.save();
+ ++ recordCount;
+ } else {
+ logger.debug("Read {} records in {} ms", recordCount,
watch.elapsed(TimeUnit.MILLISECONDS));
+ return false;
+ }
+ if (loader.limitReached(maxRecords)) {
+ return false;
+ }
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("An error occurred while reading the next key/value
pair from the sequencefile reader. "
+ + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ try {
+ // The reader not support AutoCloseable, must be closed by invoke
close().
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed closing sequencefile reader. " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
index 0572ca7..3e9b570 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatConfig.java
@@ -17,29 +17,31 @@
*/
package org.apache.drill.exec.store.easy.sequencefile;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.common.PlanStringBuilder;
-import org.apache.drill.common.logical.FormatPluginConfig;
-import java.util.List;
-import java.util.Objects;
-
-@JsonTypeName("sequencefile") @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+@JsonTypeName(SequenceFileFormatConfig.NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class SequenceFileFormatConfig implements FormatPluginConfig {
+ public static final String NAME = "sequencefile";
private final List<String> extensions;
@JsonCreator
- public SequenceFileFormatConfig(
- @JsonProperty("extensions") List<String> extensions) {
- this.extensions = extensions == null ?
- ImmutableList.of() : ImmutableList.copyOf(extensions);
+ public SequenceFileFormatConfig(@JsonProperty("extensions") List<String>
extensions) {
+ this.extensions = extensions == null ? ImmutableList.of() :
ImmutableList.copyOf(extensions);
}
+ @JsonProperty("extensions")
public List<String> getExtensions() {
return extensions;
}
@@ -63,8 +65,7 @@ public class SequenceFileFormatConfig implements
FormatPluginConfig {
@Override
public String toString() {
- return new PlanStringBuilder(this)
- .field("extensions", extensions)
- .toString();
+ return new PlanStringBuilder(this).field("extensions",
extensions).toString();
}
+
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index 9e55448..3c62fd5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -17,92 +17,77 @@
*/
package org.apache.drill.exec.store.easy.sequencefile;
-import java.io.IOException;
-import java.util.List;
-
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
public class SequenceFileFormatPlugin extends
EasyFormatPlugin<SequenceFileFormatConfig> {
- public SequenceFileFormatPlugin(String name, DrillbitContext context,
Configuration fsConf,
- StoragePluginConfig storageConfig) {
- this(name, context, fsConf, storageConfig, new
SequenceFileFormatConfig(null));
- }
- public SequenceFileFormatPlugin(String name, DrillbitContext context,
Configuration fsConf,
- StoragePluginConfig storageConfig,
SequenceFileFormatConfig formatConfig) {
- super(name, context, fsConf, storageConfig, formatConfig,
- true, false, /* splittable = */ true, /* compressible = */ true,
- formatConfig.getExtensions(), "sequencefile");
+ public SequenceFileFormatPlugin(String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ StoragePluginConfig storageConfig,
+ SequenceFileFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig,
formatConfig);
}
- @Override
- public boolean supportsPushDown() {
- return true;
+ private static EasyFormatConfig easyConfig(Configuration fsConf,
SequenceFileFormatConfig pluginConfig) {
+ EasyFormatConfig config = new EasyFormatConfig();
+ config.readable = true;
+ config.writable = false;
+ config.blockSplittable = true;
+ config.compressible = true;
+ config.extensions = pluginConfig.getExtensions();
+ config.fsConf = fsConf;
+ config.readerOperatorType = CoreOperatorType.SEQUENCE_SUB_SCAN_VALUE;
+ config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
+ config.supportsProjectPushdown = true;
+ config.defaultName = SequenceFileFormatConfig.NAME;
+ return config;
}
- @Override
- public AbstractGroupScan getGroupScan(String userName, FileSelection
selection, List<SchemaPath> columns)
- throws IOException {
- return new EasyGroupScan(userName, selection, this, columns,
selection.selectionRoot, null);
- }
+ private static class SequenceFileReaderFactory extends FileReaderFactory {
- @Override
- public boolean supportsStatistics() {
- return false;
- }
+ private final SequenceFileFormatConfig config;
+ private final EasySubScan scan;
- @Override
- public TableStatistics readStatistics(FileSystem fs, Path statsTablePath)
throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
+ public SequenceFileReaderFactory(SequenceFileFormatConfig config,
EasySubScan scan) {
+ this.config = config;
+ this.scan = scan;
+ }
- @Override
- public void writeStatistics(TableStatistics statistics, FileSystem fs, Path
statsTablePath) throws IOException {
- throw new UnsupportedOperationException("unimplemented");
- }
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ return new SequenceFileBatchReader(config, scan);
+ }
- @Override
- public RecordReader getRecordReader(FragmentContext context,
- DrillFileSystem dfs,
- FileWork fileWork,
- List<SchemaPath> columns,
- String userName) throws
ExecutionSetupException {
- final Path path = dfs.makeQualified(fileWork.getPath());
- final FileSplit split = new FileSplit(path, fileWork.getStart(),
fileWork.getLength(), new String[]{""});
- return new SequenceFileRecordReader(split, dfs,
context.getQueryUserName(), userName);
}
@Override
- public int getReaderOperatorType() {
- return CoreOperatorType.SEQUENCE_SUB_SCAN_VALUE;
+ public ManagedReader<? extends FileSchemaNegotiator>
newBatchReader(EasySubScan scan, OptionManager options)
+ throws ExecutionSetupException {
+ return new SequenceFileBatchReader(formatConfig, scan);
}
@Override
- public RecordWriter getRecordWriter(FragmentContext context, EasyWriter
writer) throws IOException {
- throw new UnsupportedOperationException();
- }
+ protected FileScanBuilder frameworkBuilder(OptionManager options,
EasySubScan scan) throws ExecutionSetupException {
+ FileScanBuilder builder = new FileScanBuilder();
+ builder.setReaderFactory(new SequenceFileReaderFactory(formatConfig,
scan));
- @Override
- public int getWriterOperatorType() {
- throw new UnsupportedOperationException();
+ initScanBuilder(builder, scan);
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ return builder;
}
}
\ No newline at end of file
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
deleted file mode 100644
index 7f9b993..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.sequencefile;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-
-
-public class SequenceFileRecordReader extends AbstractRecordReader {
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(SequenceFileRecordReader.class);
-
- private static final int PER_BATCH_RECORD_COUNT = 4096;
- private static final int PER_BATCH_BYTES = 256*1024;
-
- private static final MajorType KEY_TYPE =
Types.optional(TypeProtos.MinorType.VARBINARY);
- private static final MajorType VALUE_TYPE =
Types.optional(TypeProtos.MinorType.VARBINARY);
-
- private final String keySchema = "binary_key";
- private final String valueSchema = "binary_value";
-
- private NullableVarBinaryVector keyVector;
- private NullableVarBinaryVector valueVector;
- private final FileSplit split;
- private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable>
reader;
- private final BytesWritable key = new BytesWritable();
- private final BytesWritable value = new BytesWritable();
- private final DrillFileSystem dfs;
- private final String queryUserName;
- private final String opUserName;
-
- public SequenceFileRecordReader(final FileSplit split,
- final DrillFileSystem dfs,
- final String queryUserName,
- final String opUserName) {
- final List<SchemaPath> columns = new ArrayList<>();
- columns.add(SchemaPath.getSimplePath(keySchema));
- columns.add(SchemaPath.getSimplePath(valueSchema));
- setColumns(columns);
- this.dfs = dfs;
- this.split = split;
- this.queryUserName = queryUserName;
- this.opUserName = opUserName;
- }
-
- @Override
- protected boolean isSkipQuery() {
- return false;
- }
-
- private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable>
getRecordReader(
- final InputFormat<BytesWritable, BytesWritable> inputFormat,
- final JobConf jobConf) throws ExecutionSetupException {
- try {
- final UserGroupInformation ugi =
ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
- return ugi.doAs(new
PrivilegedExceptionAction<org.apache.hadoop.mapred.RecordReader<BytesWritable,
BytesWritable>>() {
- @Override
- public org.apache.hadoop.mapred.RecordReader<BytesWritable,
BytesWritable> run() throws Exception {
- return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
- }
- });
- } catch (IOException | InterruptedException e) {
- throw new ExecutionSetupException(
- String.format("Error in creating sequencefile reader for file: %s,
start: %d, length: %d",
- split.getPath(), split.getStart(), split.getLength()), e);
- }
- }
-
- @Override
- public void setup(OperatorContext context, OutputMutator output) throws
ExecutionSetupException {
- final SequenceFileAsBinaryInputFormat inputFormat = new
SequenceFileAsBinaryInputFormat();
- final JobConf jobConf = new JobConf(dfs.getConf());
- jobConf.setInputFormat(inputFormat.getClass());
- reader = getRecordReader(inputFormat, jobConf);
- final MaterializedField keyField = MaterializedField.create(keySchema,
KEY_TYPE);
- final MaterializedField valueField = MaterializedField.create(valueSchema,
VALUE_TYPE);
- try {
- keyVector = output.addField(keyField, NullableVarBinaryVector.class);
- valueVector = output.addField(valueField, NullableVarBinaryVector.class);
- } catch (SchemaChangeException sce) {
- throw new ExecutionSetupException("Error in setting up sequencefile
reader.", sce);
- }
- }
-
- @Override
- public int next() {
- final Stopwatch watch = Stopwatch.createStarted();
- if (keyVector != null) {
- keyVector.clear();
- keyVector.allocateNew();
- }
- if (valueVector != null) {
- valueVector.clear();
- valueVector.allocateNew();
- }
- int recordCount = 0;
- int batchSize = 0;
- try {
- while (recordCount < PER_BATCH_RECORD_COUNT && batchSize <
PER_BATCH_BYTES && reader.next(key, value)) {
- keyVector.getMutator().setSafe(recordCount, key.getBytes(), 0,
key.getLength());
- valueVector.getMutator().setSafe(recordCount, value.getBytes(), 0,
value.getLength());
- batchSize += (key.getLength() + value.getLength());
- ++recordCount;
- }
- keyVector.getMutator().setValueCount(recordCount);
- valueVector.getMutator().setValueCount(recordCount);
- logger.debug("Read {} records in {} ms", recordCount,
watch.elapsed(TimeUnit.MILLISECONDS));
- return recordCount;
- } catch (IOException ioe) {
- close();
- throw UserException.dataReadError(ioe).addContext("File Path",
split.getPath().toString()).build(logger);
- }
- }
-
- @Override
- public void close() {
- try {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- } catch (IOException e) {
- logger.warn("Exception closing reader: {}", e);
- }
- }
-
- @Override
- public String toString() {
- long position = -1L;
- try {
- if (reader != null) {
- position = reader.getPos();
- }
- } catch (IOException e) {
- logger.trace("Unable to obtain reader position.", e);
- }
- return "SequenceFileRecordReader[File=" + split.getPath()
- + ", Position=" + position
- + "]";
- }
-}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
index 7d23b1d..fd72e8e 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sequencefile/TestSequenceFileReader.java
@@ -17,32 +17,102 @@
*/
package org.apache.drill.exec.store.sequencefile;
-import java.io.DataOutputStream;
+import static org.junit.Assert.assertEquals;
+
import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.file.Paths;
-import org.junit.Test;
-import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.easy.sequencefile.SequenceFileBatchReader;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.hadoop.io.BytesWritable;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestSequenceFileReader extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+ dirTestWatcher.copyResourceToRoot(Paths.get("sequencefiles/"));
+ }
+
+ @Test
+ public void testStarQuery() throws Exception {
+ String sql = "select * from cp.`sequencefiles/simple.seq`";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable(SequenceFileBatchReader.KEY_SCHEMA, MinorType.VARBINARY)
+ .addNullable(SequenceFileBatchReader.VALUE_SCHEMA, MinorType.VARBINARY)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("key1".getBytes(), "value1".getBytes())
+ .addRow("key2".getBytes(), "value2".getBytes())
+ .build();
+
+ assertEquals(2, sets.rowCount());
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ String sql = "select convert_from(binary_key, 'UTF8') as binary_key from
cp.`sequencefiles/simple.seq`";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable(SequenceFileBatchReader.KEY_SCHEMA, MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow(byteWritableString("key0"))
+ .addRow(byteWritableString("key1"))
+ .build();
+
+ assertEquals(2, sets.rowCount());
+
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "select * from cp.`sequencefiles/simple.seq` limit 1 offset
1";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
-public class TestSequenceFileReader extends BaseTestQuery {
+ assertEquals(1, sets.rowCount());
+ sets.clear();
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = "select count(*) from cp.`sequencefiles/simple.seq`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+
+ assertEquals("Counts should match", 2, cnt);
+ }
- public static String byteWritableString(String input) throws Exception {
+ private static String byteWritableString(String input) throws Exception {
final ByteArrayOutputStream bout = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bout);
final BytesWritable writable = new BytesWritable(input.getBytes("UTF-8"));
writable.write(out);
return new String(bout.toByteArray());
}
-
- @Test
- public void testSequenceFileReader() throws Exception {
- testBuilder()
- .sqlQuery("select convert_from(t.binary_key, 'UTF8') as k,
convert_from(t.binary_value, 'UTF8') as v " +
- "from cp.`sequencefiles/simple.seq` t")
- .ordered()
- .baselineColumns("k", "v")
- .baselineValues(byteWritableString("key0"), byteWritableString("value0"))
- .baselineValues(byteWritableString("key1"), byteWritableString("value1"))
- .build().run();
- }
}