This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 39fc783206 Initial Commit (#2654)
39fc783206 is described below
commit 39fc783206308f253035b802f6529f97c0b5abe2
Author: Charles S. Givre <[email protected]>
AuthorDate: Thu Sep 22 11:28:58 2022 -0400
Initial Commit (#2654)
---
.../drill/exec/store/spss/SpssBatchReader.java | 43 ++++++----------------
.../drill/exec/store/spss/SpssFormatPlugin.java | 36 ++++++------------
2 files changed, 23 insertions(+), 56 deletions(-)
diff --git
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
index 46cff86819..ce77e83b07 100644
---
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
+++
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssBatchReader.java
@@ -24,14 +24,14 @@ import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
-import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,26 +41,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-public class SpssBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class SpssBatchReader implements ManagedReader {
private static final Logger logger =
LoggerFactory.getLogger(SpssBatchReader.class);
private static final String VALUE_LABEL = "_value";
-
- private final int maxRecords;
-
- private FileSplit split;
-
+ private final FileDescrip file;
private InputStream fsStream;
-
private SpssDataFileReader spssReader;
-
private RowSetLoader rowWriter;
-
private List<SpssVariable> variableList;
-
private List<SpssColumnWriter> writerList;
-
private CustomErrorContext errorContext;
@@ -73,21 +64,14 @@ public class SpssBatchReader implements
ManagedReader<FileSchemaNegotiator> {
}
}
- public SpssBatchReader(int maxRecords) {
- this.maxRecords = maxRecords;
- }
-
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
- openFile(negotiator);
+ public SpssBatchReader(FileSchemaNegotiator negotiator) {
+ file = negotiator.file();
+ openFile();
negotiator.tableSchema(buildSchema(), true);
errorContext = negotiator.parentErrorContext();
ResultSetLoader loader = negotiator.build();
rowWriter = loader.writer();
buildReaderList();
-
- return true;
}
@Override
@@ -108,14 +92,14 @@ public class SpssBatchReader implements
ManagedReader<FileSchemaNegotiator> {
}
}
- private void openFile(FileSchemaNegotiator negotiator) {
+ private void openFile() {
try {
- fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ fsStream =
file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
spssReader = new SpssDataFileReader(fsStream);
} catch (IOException e) {
throw UserException
.dataReadError(e)
- .message("Unable to open SPSS File %s", split.getPath())
+ .message("Unable to open SPSS File %s", file.split().getPath())
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
@@ -123,11 +107,6 @@ public class SpssBatchReader implements
ManagedReader<FileSchemaNegotiator> {
}
private boolean processNextRow() {
- // Check to see if the limit has been reached
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
-
try {
// Stop reading when you run out of data
if (!spssReader.readNextCase()) {
diff --git
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
index 35210e7928..b0d42f1ca5 100644
---
a/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
+++
b/contrib/format-spss/src/main/java/org/apache/drill/exec/store/spss/SpssFormatPlugin.java
@@ -21,16 +21,13 @@ package org.apache.drill.exec.store.spss;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import
org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
@@ -40,15 +37,15 @@ public class SpssFormatPlugin extends
EasyFormatPlugin<SpssFormatConfig> {
private static class SpssReaderFactory extends FileReaderFactory {
- private final int maxRecords;
+ private final EasySubScan scan;
- public SpssReaderFactory(int maxRecords) {
- this.maxRecords = maxRecords;
+ public SpssReaderFactory(EasySubScan scan) {
+ this.scan = scan;
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new SpssBatchReader(maxRecords);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new SpssBatchReader(negotiator);
}
}
@@ -68,23 +65,14 @@ public class SpssFormatPlugin extends
EasyFormatPlugin<SpssFormatConfig> {
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator>
newBatchReader(EasySubScan scan, OptionSet options) {
- return new SpssBatchReader(scan.getMaxRecords());
- }
-
- @Override
- protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet
options) {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new SpssReaderFactory(scan.getMaxRecords()));
-
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan
scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new SpssReaderFactory(scan));
}
}