This is an automated email from the ASF dual-hosted git repository.
arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 2224ee1 DRILL-7293: Convert the regex ("log") plugin to use EVF
2224ee1 is described below
commit 2224ee1014cfdcc2b76f1a8290171ca63db76a53
Author: Paul Rogers <[email protected]>
AuthorDate: Wed Jun 12 18:28:16 2019 -0700
DRILL-7293: Convert the regex ("log") plugin to use EVF
Converts the log format plugin (which uses a regex for parsing) to work
with the Extended Vector Format.
User-visible behavior changes added to the README file.
* Use the plugin config object to pass config to the Easy framework.
* Use the EVF scan mechanism in place of the legacy "ScanBatch"
mechanism.
* Minor code and README cleanup.
* Replace ad-hoc type conversion with builtin conversions
The provided schema support in the enhanced vector framework (EVF)
provides automatic conversions from VARCHAR to most types. The log
format plugin was created before EVF was available and provided its own
conversion mechanism. This commit removes the ad-hoc conversion code and
instead uses the log plugin config schema information to create an
"output schema" just as if it was provided by the provided schema
framework.
Because we need the schema in the plugin (rather than the reader), moved
the schema-parsing code out of the reader into the plugin. The plugin
creates two schemas: an "output schema" with the desired output types,
and a "reader schema" that uses only VARCHAR. This causes the EVF to
perform conversions.
* Enable provided schema support
Allows the user to specify types using either the format config (as
previously) or a provided schema. If a schema is provided, it will match
columns using names specified in the format config.
The provided schema can specify both types and modes (nullable or not
null.)
If a schema is provided, then the types specified in the plugin config
are ignored. No attempt is made to merge schemas.
If a schema is provided, but a column is omitted from the schema, the
type defaults to VARCHAR.
* Added ability to specify regex in table properties
Allows the user to specify the regex, and the column schema,
using a CREATE SCHEMA statement. The README file provides the details.
Unit tests demonstrate and verify the functionality.
* Used the custom error context provided by EVF to enhance the log format
reader error messages.
* Added user name to default EVF error context
* Added support for table functions
Can set the regex and maxErrors fields, but not the schema.
Schema will default to "field_0", "field_1", etc. of type
VARCHAR.
* Added unit tests to verify the functionality.
* Added a check, and a test, for a regex with no groups.
* Added columns array support
When the log regex plugin is given no schema, it previously
created a list of columns "field_0", "field_1", etc. After
this change, the plugin instead follows the pattern set by
the text plugin: it will place all fields into the columns
array. (The two special fields are still separate.)
A few adjustments were necessary to the columns array
framework to allow use of the special columns along with
the `columns` column.
Modified unit tests and the README to reflect this change.
The change should be backward compatible because few users
are likely relying on the dummy field names.
Added unit tests to verify that schema-based table
functions work. A test shows that, due to the unforunate
config property name "schema", users of this plugin cannot
combine a config table function with the schema attribute
in the way promised in DRILL-6965.
---
.../impl/scan/columns/ColumnsArrayManager.java | 23 +-
.../impl/scan/columns/ColumnsArrayParser.java | 32 +-
.../impl/scan/columns/ColumnsScanFramework.java | 19 +-
.../impl/scan/project/projSet/TypeConverter.java | 2 +
.../exec/store/easy/text/TextFormatPlugin.java | 4 +-
.../easy/text/reader/CompliantTextBatchReader.java | 6 +-
.../drill/exec/store/log/LogBatchReader.java | 278 ++++++++
.../drill/exec/store/log/LogFormatConfig.java | 57 +-
.../drill/exec/store/log/LogFormatField.java | 61 +-
.../drill/exec/store/log/LogFormatPlugin.java | 402 +++++++++--
.../drill/exec/store/log/LogRecordReader.java | 769 ---------------------
.../java/org/apache/drill/exec/store/log/README.md | 251 ++++++-
.../apache/drill/TestSchemaWithTableFunction.java | 16 +-
.../exec/physical/impl/scan/TestColumnsArray.java | 15 +-
.../impl/scan/TestColumnsArrayFramework.java | 24 +-
.../physical/impl/scan/TestColumnsArrayParser.java | 32 +-
.../apache/drill/exec/store/log/TestLogReader.java | 469 +++++++++++--
.../exec/record/metadata/AbstractPropertied.java | 16 +
.../drill/exec/record/metadata/Propertied.java | 21 +
19 files changed, 1468 insertions(+), 1029 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayManager.java
index 470929c..98fa2fa 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayManager.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.physical.impl.scan.columns;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.exec.physical.impl.scan.project.ColumnProjection;
+import
org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver;
import org.apache.drill.exec.physical.impl.scan.project.ResolvedTuple;
import
org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.ScanProjectionParser;
-import
org.apache.drill.exec.physical.impl.scan.project.ReaderLevelProjection.ReaderProjectionResolver;
+import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
/**
* Handles the special case in which the entire row is returned as a
@@ -76,14 +79,17 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
public class ColumnsArrayManager implements ReaderProjectionResolver {
- public static final String COLUMNS_COL = "columns";
-
// Internal
private final ColumnsArrayParser parser;
+ public ColumnsArrayManager(boolean requireColumnsArray, boolean
allowOtherCols) {
+ parser = new ColumnsArrayParser(requireColumnsArray, allowOtherCols);
+ }
+
+ @VisibleForTesting
public ColumnsArrayManager(boolean requireColumnsArray) {
- parser = new ColumnsArrayParser(requireColumnsArray);
+ this(requireColumnsArray, false);
}
public ScanProjectionParser projectionParser() { return parser; }
@@ -91,8 +97,7 @@ public class ColumnsArrayManager implements
ReaderProjectionResolver {
public ReaderProjectionResolver resolver() { return this; }
@Override
- public void startResolution() {
- }
+ public void startResolution() { }
@Override
public boolean resolveColumn(ColumnProjection col, ResolvedTuple outputTuple,
@@ -108,13 +113,13 @@ public class ColumnsArrayManager implements
ReaderProjectionResolver {
throw new IllegalStateException("Table schema must have exactly one
column.");
}
- final int tabColIndex = tableSchema.index(COLUMNS_COL);
+ final int tabColIndex =
tableSchema.index(ColumnsScanFramework.COLUMNS_COL);
if (tabColIndex == -1) {
- throw new IllegalStateException("Table schema must include only one
column named `" + COLUMNS_COL + "`");
+ throw new IllegalStateException("Table schema must include only one
column named `" + ColumnsScanFramework.COLUMNS_COL + "`");
}
final MaterializedField tableCol = tableSchema.column(tabColIndex);
if (tableCol.getType().getMode() != DataMode.REPEATED) {
- throw new IllegalStateException("Table schema column `" + COLUMNS_COL +
+ throw new IllegalStateException("Table schema column `" +
ColumnsScanFramework.COLUMNS_COL +
"` is of mode " + tableCol.getType().getMode() +
" but expected " + DataMode.REPEATED);
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
index 24a332d..e62b035 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsArrayParser.java
@@ -25,6 +25,7 @@ import
org.apache.drill.exec.physical.impl.scan.project.ScanLevelProjection.Scan
import org.apache.drill.exec.physical.rowSet.project.RequestedColumnImpl;
import
org.apache.drill.exec.physical.rowSet.project.RequestedTuple.RequestedColumn;
import org.apache.drill.exec.store.easy.text.reader.TextReader;
+import
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
/**
* Parses the `columns` array. Doing so is surprisingly complex.
@@ -71,8 +72,21 @@ public class ColumnsArrayParser implements
ScanProjectionParser {
// Config
+ /**
+ * True if the project list must include either the columns[] array
+ * or the wildcard.
+ */
+
private final boolean requireColumnsArray;
+ /**
+ * True if the project list can include columns other than/in addition to
+ * the columns[] array. Handy if the plugin provides special columns such
+ * as the log regex plugin.
+ */
+
+ private final boolean allowOtherCols;
+
// Internals
private ScanLevelProjection builder;
@@ -81,8 +95,14 @@ public class ColumnsArrayParser implements
ScanProjectionParser {
private UnresolvedColumnsArrayColumn columnsArrayCol;
- public ColumnsArrayParser(boolean requireColumnsArray) {
+ public ColumnsArrayParser(boolean requireColumnsArray, boolean
allowOtherCols) {
this.requireColumnsArray = requireColumnsArray;
+ this.allowOtherCols = allowOtherCols;
+ }
+
+ @VisibleForTesting
+ public ColumnsArrayParser(boolean requireColumnsArray) {
+ this(requireColumnsArray, false);
}
@Override
@@ -92,7 +112,7 @@ public class ColumnsArrayParser implements
ScanProjectionParser {
@Override
public boolean parse(RequestedColumn inCol) {
- if (! requireColumnsArray) {
+ if (! requireColumnsArray && ! allowOtherCols) {
// If we do not require the columns array, then we presume that
// the reader does not provide arrays, so any use of the columns[x]
@@ -111,17 +131,17 @@ public class ColumnsArrayParser implements
ScanProjectionParser {
}
if (inCol.isWildcard()) {
createColumnsCol(
- new RequestedColumnImpl(builder.rootProjection(),
ColumnsArrayManager.COLUMNS_COL));
+ new RequestedColumnImpl(builder.rootProjection(),
ColumnsScanFramework.COLUMNS_COL));
return true;
}
- if (! inCol.nameEquals(ColumnsArrayManager.COLUMNS_COL)) {
+ if (! inCol.nameEquals(ColumnsScanFramework.COLUMNS_COL)) {
return false;
}
// The columns column cannot be a map. That is, the following is
// not allowed: columns.foo.
- if (inCol.isTuple()) {
+ if (inCol.isTuple() && ! allowOtherCols) {
throw UserException
.validationError()
.message("Column `%s` has map elements, but must be an array",
inCol.name())
@@ -174,7 +194,7 @@ public class ColumnsArrayParser implements
ScanProjectionParser {
.addContext(builder.context())
.build(logger);
}
- if (requireColumnsArray) {
+ if (requireColumnsArray && ! allowOtherCols) {
throw UserException
.validationError()
.message("Only `columns` column is allowed. Found: " + col.name())
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
index 6d22dd3..17b3f68 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/columns/ColumnsScanFramework.java
@@ -18,8 +18,11 @@
package org.apache.drill.exec.physical.impl.scan.columns;
import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiatorImpl;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
/**
* Scan framework for a file that supports the special "columns" column.
@@ -36,13 +39,20 @@ import
org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiatorImpl;
public class ColumnsScanFramework extends FileScanFramework {
+ public static final String COLUMNS_COL = "columns";
+
public static class ColumnsScanBuilder extends FileScanBuilder {
protected boolean requireColumnsArray;
+ protected boolean allowOtherCols;
public void requireColumnsArray(boolean flag) {
requireColumnsArray = flag;
}
+ public void allowOtherCols(boolean flag) {
+ allowOtherCols = flag;
+ }
+
@Override
public ScanOperatorEvents buildEvents() {
return new ColumnsScanFramework(this);
@@ -84,8 +94,10 @@ public class ColumnsScanFramework extends FileScanFramework {
@Override
protected void configure() {
super.configure();
+ ColumnsScanBuilder colScanBuilder = ((ColumnsScanBuilder) builder);
columnsArrayManager = new ColumnsArrayManager(
- ((ColumnsScanBuilder) builder).requireColumnsArray);
+ colScanBuilder.requireColumnsArray,
+ colScanBuilder.allowOtherCols);
builder.addParser(columnsArrayManager.projectionParser());
builder.addResolver(columnsArrayManager.resolver());
}
@@ -94,4 +106,9 @@ public class ColumnsScanFramework extends FileScanFramework {
protected SchemaNegotiatorImpl newNegotiator() {
return new ColumnsSchemaNegotiatorImpl(this);
}
+
+ public static TupleMetadata columnsSchema() {
+ return new SchemaBuilder()
+ .addArray(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
+ .buildSchema(); }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TypeConverter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TypeConverter.java
index 064b678..8269116 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TypeConverter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/projSet/TypeConverter.java
@@ -59,6 +59,8 @@ public class TypeConverter {
return this;
}
+ public TupleMetadata providedSchema() { return providedSchema; }
+
public Builder transform(TypeConverter.CustomTypeTransform transform) {
this.transform = transform;
return this;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 243d58f..289d26c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -44,7 +44,7 @@ import
org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.Propertied;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.RecordWriter;
@@ -90,7 +90,7 @@ public class TextFormatPlugin extends
EasyFormatPlugin<TextFormatPlugin.TextForm
// uses of these names and denotes that the names work only for the text
// format plugin.
- public static final String TEXT_PREFIX = TupleMetadata.DRILL_PROP_PREFIX +
PLUGIN_NAME + ".";
+ public static final String TEXT_PREFIX =
Propertied.pluginPrefix(PLUGIN_NAME);
public static final String HAS_HEADERS_PROP = TEXT_PREFIX + "extractHeader";
public static final String SKIP_FIRST_LINE_PROP = TEXT_PREFIX +
"skipFirstLine";
public static final String DELIMITER_PROP = TEXT_PREFIX + "fieldDelimiter";
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 3b7ffb6..2d86fda 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -24,7 +24,7 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
import
org.apache.drill.exec.physical.impl.scan.columns.ColumnsSchemaNegotiator;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.rowSet.RowSetLoader;
@@ -151,9 +151,7 @@ public class CompliantTextBatchReader implements
ManagedReader<ColumnsSchemaNego
private TextOutput openWithoutHeaders(
ColumnsSchemaNegotiator schemaNegotiator) {
- final TupleMetadata schema = new TupleSchema();
- schema.addColumn(MetadataUtils.newScalar(ColumnsArrayManager.COLUMNS_COL,
MinorType.VARCHAR, DataMode.REPEATED));
- schemaNegotiator.setTableSchema(schema, true);
+ schemaNegotiator.setTableSchema(ColumnsScanFramework.columnsSchema(),
true);
writer = schemaNegotiator.build().writer();
return new RepeatedVarCharOutput(writer,
schemaNegotiator.projectedIndexes());
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
new file mode 100644
index 0000000..f9db144
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.log;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.drill.common.exceptions.UserException;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.rowSet.ResultSetLoader;
+import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
+import org.apache.hadoop.mapred.FileSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LogBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(LogBatchReader.class);
+ public static final String RAW_LINE_COL_NAME = "_raw";
+ public static final String UNMATCHED_LINE_COL_NAME = "_unmatched_rows";
+
+ public static class LogReaderConfig {
+ protected final LogFormatPlugin plugin;
+ protected final Pattern pattern;
+ protected final TupleMetadata schema;
+ protected final boolean asArray;
+ protected final int groupCount;
+ protected final int maxErrors;
+
+ public LogReaderConfig(LogFormatPlugin plugin, Pattern pattern,
+ TupleMetadata schema, boolean asArray,
+ int groupCount, int maxErrors) {
+ this.plugin = plugin;
+ this.pattern = pattern;
+ this.schema = schema;
+ this.asArray = asArray;
+ this.groupCount = groupCount;
+ this.maxErrors = maxErrors;
+ }
+ }
+
+ /**
+ * Write group values to value vectors.
+ */
+
+ private interface VectorWriter {
+ void loadVectors(Matcher m);
+ }
+
+ /**
+ * Write group values to individual scalar columns.
+ */
+
+ private static class ScalarGroupWriter implements VectorWriter {
+
+ private final TupleWriter rowWriter;
+
+ public ScalarGroupWriter(TupleWriter rowWriter) {
+ this.rowWriter = rowWriter;
+ }
+
+ @Override
+ public void loadVectors(Matcher m) {
+ for (int i = 0; i < m.groupCount(); i++) {
+ String value = m.group(i + 1);
+ if (value != null) {
+ rowWriter.scalar(i).setString(value);
+ }
+ }
+ }
+ }
+
+ /**
+ * Write group values to the columns[] array.
+ */
+
+ private static class ColumnsArrayWriter implements VectorWriter {
+
+ private final ScalarWriter elementWriter;
+
+ public ColumnsArrayWriter(TupleWriter rowWriter) {
+ elementWriter = rowWriter.array(0).scalar();
+ }
+
+ @Override
+ public void loadVectors(Matcher m) {
+ for (int i = 0; i < m.groupCount(); i++) {
+ String value = m.group(i + 1);
+ elementWriter.setString(value == null ? "" : value);
+ }
+ }
+ }
+
+ private final LogReaderConfig config;
+ private FileSplit split;
+ private BufferedReader reader;
+ private ResultSetLoader loader;
+ private VectorWriter vectorWriter;
+ private ScalarWriter rawColWriter;
+ private ScalarWriter unmatchedColWriter;
+ private boolean saveMatchedRows;
+ private int lineNumber;
+ private int errorCount;
+
+ public LogBatchReader(LogReaderConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ negotiator.setTableSchema(config.schema, true);
+ loader = negotiator.build();
+ bindColumns(loader.writer());
+ openFile(negotiator);
+ return true;
+ }
+
+ private void bindColumns(RowSetLoader writer) {
+ rawColWriter = writer.scalar(RAW_LINE_COL_NAME);
+ unmatchedColWriter = writer.scalar(UNMATCHED_LINE_COL_NAME);
+ saveMatchedRows = rawColWriter.isProjected();
+
+ // If no match-case columns are projected, and the unmatched
+ // columns is unprojected, then we want to count (matched)
+ // rows.
+
+ saveMatchedRows |= !unmatchedColWriter.isProjected();
+
+ // This reader is unusual: it can save only unmatched rows,
+ // save only matched rows, or both. We check if we want to
+ // save matched rows to by checking if any of the "normal"
+ // reader columns are projected (ignoring the two special
+ // columns.) If so, create a vector writer to save values.
+
+ if (config.asArray) {
+ saveMatchedRows |= writer.column(0).isProjected();
+ if (saveMatchedRows) {
+ // Save using the defined columns
+ vectorWriter = new ColumnsArrayWriter(writer);
+ }
+ } else {
+ for (int i = 0; i < config.schema.size(); i++) {
+ saveMatchedRows |= writer.column(i).isProjected();
+ }
+ if (saveMatchedRows) {
+ // Save columns as an array
+ vectorWriter = new ScalarGroupWriter(writer);
+ }
+ }
+ }
+
+ private void openFile(FileSchemaNegotiator negotiator) {
+ InputStream in;
+ try {
+ in = negotiator.fileSystem().open(split.getPath());
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to open input file")
+ .addContext("File path:", split.getPath())
+ .addContext(loader.context())
+ .build(logger);
+ }
+ reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
+ }
+
+ @Override
+ public boolean next() {
+ RowSetLoader rowWriter = loader.writer();
+ while (! rowWriter.isFull()) {
+ if (! nextLine(rowWriter)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean nextLine(RowSetLoader rowWriter) {
+ String line;
+ try {
+ line = reader.readLine();
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Error reading file")
+ .addContext("File", split.getPath())
+ .addContext(loader.context())
+ .build(logger);
+ }
+
+ if (line == null) {
+ return false;
+ }
+ lineNumber++;
+ Matcher lineMatcher = config.pattern.matcher(line);
+ if (lineMatcher.matches()) {
+
+ // Load matched row into vectors.
+
+ if (saveMatchedRows) {
+ rowWriter.start();
+ rawColWriter.setString(line);
+ vectorWriter.loadVectors(lineMatcher);
+ rowWriter.save();
+ }
+ return true;
+ }
+
+ errorCount++;
+ if (errorCount < config.maxErrors) {
+ logger.warn("Unmatched line: {}", line);
+ } else {
+ throw UserException.parseError()
+ .message("Too many errors. Max error threshold exceeded.")
+ .addContext("Line", line)
+ .addContext("Line number", lineNumber)
+ .addContext(loader.context())
+ .build(logger);
+ }
+
+ // For unmatched columns, create an output row only if the
+ // user asked for the unmatched values.
+
+ if (unmatchedColWriter.isProjected()) {
+ rowWriter.start();
+ unmatchedColWriter.setString(line);
+ rowWriter.save();
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (reader == null) {
+ return;
+ }
+ try {
+ reader.close();
+ } catch (IOException e) {
+ logger.warn("Error when closing file: " + split.getPath(), e);
+ } finally {
+ reader = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "LogRecordReader[File=%s, Line=%d]",
+ split.getPath(), lineNumber);
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
index f82c50b..0908a4a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatConfig.java
@@ -17,22 +17,25 @@
*/
package org.apache.drill.exec.store.log;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Objects;
-import org.apache.drill.common.logical.FormatPluginConfig;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-@JsonTypeName("logRegex")
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.base.Objects;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(LogFormatPlugin.PLUGIN_NAME)
public class LogFormatConfig implements FormatPluginConfig {
- private String regex;
- private String extension;
- private int maxErrors = 10;
- private List<LogFormatField> schema;
+ // Fields must be public for table functions to work: DRILL-6672
+
+ public String regex;
+ public String extension;
+ public int maxErrors = 10;
+ public List<LogFormatField> schema;
public String getRegex() {
return regex;
@@ -52,11 +55,11 @@ public class LogFormatConfig implements FormatPluginConfig {
//Setters
public void setExtension(String ext) {
- this.extension = ext;
+ extension = ext;
}
public void setMaxErrors(int errors) {
- this.maxErrors = errors;
+ maxErrors = errors;
}
public void setRegex(String regex) {
@@ -64,7 +67,7 @@ public class LogFormatConfig implements FormatPluginConfig {
}
public void setSchema() {
- this.schema = new ArrayList<LogFormatField>();
+ schema = new ArrayList<LogFormatField>();
}
@Override
@@ -88,13 +91,18 @@ public class LogFormatConfig implements FormatPluginConfig {
}
@JsonIgnore
+ public boolean hasSchema() {
+ return schema != null && ! schema.isEmpty();
+ }
+
+ @JsonIgnore
public List<String> getFieldNames() {
- List<String> result = new ArrayList<String>();
- if (this.schema == null) {
+ List<String> result = new ArrayList<>();
+ if (! hasSchema()) {
return result;
}
- for (LogFormatField field : this.schema) {
+ for (LogFormatField field : schema) {
result.add(field.getFieldName());
}
return result;
@@ -102,18 +110,21 @@ public class LogFormatConfig implements
FormatPluginConfig {
@JsonIgnore
public String getDataType(int fieldIndex) {
- LogFormatField f = this.schema.get(fieldIndex);
- return f.getFieldType().toUpperCase();
+ LogFormatField field = getField(fieldIndex);
+ return field == null ? null : field.getFieldType();
}
@JsonIgnore
public LogFormatField getField(int fieldIndex) {
- return this.schema.get(fieldIndex);
+ if (schema == null || fieldIndex >= schema.size()) {
+ return null;
+ }
+ return schema.get(fieldIndex);
}
@JsonIgnore
- public String getDateFormat(int patternIndex) {
- LogFormatField f = this.schema.get(patternIndex);
- return f.getFormat();
+ public String getDateFormat(int fieldIndex) {
+ LogFormatField field = getField(fieldIndex);
+ return field == null ? null : field.getFormat();
}
-}
\ No newline at end of file
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
index 64a6db7..d6ab41f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatField.java
@@ -18,35 +18,31 @@
package org.apache.drill.exec.store.log;
+import
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * The three configuration options for a field are:
+ * <ol>
+ * <li>The field name</li>
+ * <li>The data type (fieldType). Field type defaults to VARCHAR
+ * if it is not specified</li>
+ * <li>The format string which is used for date/time fields.
+ * This field is ignored if used with a non date/time field.</li>
+ * </ol>
+ */
@JsonTypeName("regexReaderFieldDescription")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class LogFormatField {
- /*
- * The three configuration options for a field are:
- * 1. The field name
- * 2. The data type (fieldType). Field type defaults to VARCHAR if it is
not specified
- * 3. The format string which is used for date/time fields. This field is
ignored if used with a non
- * date/time field.
- * */
-
- private String fieldName = "";
- private String fieldType = "VARCHAR";
- private String format;
-
- //These will be used in the future for field validation and masking
- //public String validator;
- //public double minValue;
- //public double maxValue;
-
-
- public LogFormatField() {
- }
+ private final String fieldName;
+ private final String fieldType;
+ private final String format;
- //These constructors are used for unit testing
+ @VisibleForTesting
public LogFormatField(String fieldName) {
this(fieldName, null, null);
}
@@ -61,26 +57,9 @@ public class LogFormatField {
this.format = format;
}
- public String getFieldName() {
- return fieldName;
- }
-
- public String getFieldType() {
- return fieldType;
- }
-
- public String getFormat() {
- return format;
- }
-
+ public String getFieldName() { return fieldName; }
- /*
- public String getValidator() { return validator; }
+ public String getFieldType() { return fieldType; }
- public double getMinValue() { return minValue; }
-
- public double getMaxValue() {
- return maxValue;
- }
- */
+ public String getFormat() { return format; }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
index 3f381c8..ce017cd 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogFormatPlugin.java
@@ -18,86 +18,386 @@
package org.apache.drill.exec.store.log;
-import java.io.IOException;
-import org.apache.drill.exec.planner.common.DrillStatsTable;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
+import
org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.Propertied;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.store.log.LogBatchReader.LogReaderConfig;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
-
-import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LogFormatPlugin extends EasyFormatPlugin<LogFormatConfig> {
+ private static final Logger logger =
LoggerFactory.getLogger(LogFormatPlugin.class);
+
+ public static final String PLUGIN_NAME = "logRegex";
+ public static final String PROP_PREFIX =
Propertied.pluginPrefix(PLUGIN_NAME);
+ public static final String REGEX_PROP = PROP_PREFIX + "regex";
+ public static final String MAX_ERRORS_PROP = PROP_PREFIX + "maxErrors";
+
+ private static class LogReaderFactory extends FileReaderFactory {
+ private final LogReaderConfig readerConfig;
- public static final String DEFAULT_NAME = "logRegex";
- private final LogFormatConfig formatConfig;
+ public LogReaderFactory(LogReaderConfig config) {
+ readerConfig = config;
+ }
+
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ return new LogBatchReader(readerConfig);
+ }
+ }
public LogFormatPlugin(String name, DrillbitContext context,
Configuration fsConf, StoragePluginConfig
storageConfig,
LogFormatConfig formatConfig) {
- super(name, context, fsConf, storageConfig, formatConfig,
- true, // readable
- false, // writable
- true, // blockSplittable
- true, // compressible
- Lists.newArrayList(formatConfig.getExtension()),
- DEFAULT_NAME);
- this.formatConfig = formatConfig;
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig,
formatConfig);
}
- @Override
- public RecordReader getRecordReader(FragmentContext context,
- DrillFileSystem dfs, FileWork fileWork,
List<SchemaPath> columns,
- String userName) throws
ExecutionSetupException {
- return new LogRecordReader(context, dfs, fileWork,
- columns, userName, formatConfig);
+ private static EasyFormatConfig easyConfig(Configuration fsConf,
LogFormatConfig pluginConfig) {
+ EasyFormatConfig config = new EasyFormatConfig();
+ config.readable = true;
+ config.writable = false;
+ // Should be block splitable, but logic not yet implemented.
+ config.blockSplittable = false;
+ config.compressible = true;
+ config.supportsProjectPushdown = true;
+ config.extensions = Collections.singletonList(pluginConfig.getExtension());
+ config.fsConf = fsConf;
+ config.defaultName = PLUGIN_NAME;
+ config.readerOperatorType = CoreOperatorType.REGEX_SUB_SCAN_VALUE;
+ config.useEnhancedScan = true;
+ return config;
}
+ /**
+ * Build a file scan framework for this plugin.
+ * <p>
+ * This plugin was created before the concept of "provided schema" was
+ * available. This plugin does, however, support a provided schema and
+ * table properties. The code here handles the various cases.
+ * <p>
+ * For the regex and max errors:
+ * <ul>
+ * <li>Use the table property, if a schema is provided and the table
+ * property is set.</li>
+ * <li>Else, use the format config property.</li>
+ * </ul>
+ * <p>
+ * For columns:
+ * <ul>
+ * <li>If no schema is provided (or the schema contains only table
+ * properties with no columns), use the column names and types from
+ * the plugin config.</li>
+ * <li>If a schema is provided, and the plugin defines columns, use
+ * the column types from the provided schema. Columns are matched by
+ * name. Provided schema type override any types specified in the
+ * plugin.</li>
+ * <li>If a schema is provided, and the plugin config defines no
+ * columns, use the column names and types from the provided schema.
+ * The columns are assumed to appear in the same order as regex
+ * fields.</li>
+ * <li>If the regex has more groups than either schema has columns,
+ * fill the extras with field_n of type VARCHAR.</li>
+ * </ul>
+ * <p>
+ * Typical use cases:
+ * <ul>
+ * <li>Minimum config: only a regex in either plugin config or table
+ * properties.</li>
+ * <li>Plugin config defines regex, field names and types. (The
+ * typical approach in Drill 1.16 and before.</li>
+ * <li>Plugin config defines the regex and field names. The provided
+ * schema defines types. (Separates physical and logical table
+ * definitions.</li>
+ * <li>Provided schema defines the regex and columns. May simplify
+ * configuration as all table information is in one place. Allows
+ * different regex patterns for different tables of the same file
+ * suffix.</li>
+ * </ul>
+ */
+
@Override
- public boolean supportsPushDown() {
- return true;
+ protected FileScanBuilder frameworkBuilder(
+ OptionManager options, EasySubScan scan) throws ExecutionSetupException {
+
+ // Pattern and schema identical across readers; define
+ // up front.
+
+ TupleMetadata providedSchema = scan.getSchema();
+ Pattern pattern = setupPattern(providedSchema);
+
+ // Use a dummy matcher to get the group count. Group count not
+ // available from the pattern itself, oddly.
+
+ Matcher m = pattern.matcher("dummy");
+ int groupCount = m.groupCount();
+ if (groupCount == 0) {
+ throw UserException
+ .validationError()
+ .message("Regex property has no groups, see Java Pattern class for
details.")
+ .addContext("Plugin", PLUGIN_NAME)
+ .build(logger);
+ }
+
+ boolean hasColumns = (providedSchema != null && providedSchema.size() > 0);
+ boolean hasSchema = hasColumns || formatConfig.hasSchema();
+ SchemaBuilder schemaBuilder = new SchemaBuilder();
+ FileScanBuilder builder;
+ if (hasSchema) {
+ TupleMetadata outputSchema;
+ if (!hasColumns) {
+
+ // No provided schema, build from plugin config
+ // (or table function.)
+
+ outputSchema = defineOutputSchemaFromConfig(groupCount);
+ } else if (groupCount <= providedSchema.size()) {
+
+ // Is a provided schema with enough columns. Just
+ // use it.
+
+ outputSchema = providedSchema;
+ } else {
+
+ // Have a provided schema, but more groups than
+ // provided columns. Make up additional columns.
+
+ outputSchema = defineOutputSchemaFromProvided(providedSchema,
groupCount);
+ }
+ defineReaderSchema(schemaBuilder, outputSchema);
+
+ // Use the file framework to enable support for implicit and partition
+ // columns.
+
+ builder = new FileScanBuilder();
+ initScanBuilder(builder, scan);
+ builder.typeConverterBuilder().providedSchema(outputSchema);
+ } else {
+
+ // No schema provided; use the columns framework to use the columns[]
array
+ // Also supports implicit and partition metadata.
+
+ schemaBuilder.addArray(ColumnsScanFramework.COLUMNS_COL,
MinorType.VARCHAR);
+ ColumnsScanBuilder colScanBuilder = new ColumnsScanBuilder();
+ initScanBuilder(colScanBuilder, scan);
+ colScanBuilder.requireColumnsArray(true);
+ colScanBuilder.allowOtherCols(true);
+ builder = colScanBuilder;
+ }
+
+ // Pass along the class that will create a batch reader on demand for
+ // each input file.
+
+ builder.setReaderFactory(new LogReaderFactory(
+ new LogReaderConfig(this, pattern, buildSchema(schemaBuilder),
+ !hasSchema, groupCount, maxErrors(providedSchema))));
+
+ // The default type of regex columns is nullable VarChar,
+ // so let's use that as the missing column type.
+
+ builder.setNullType(Types.optional(MinorType.VARCHAR));
+ return builder;
}
- @Override
- public RecordWriter getRecordWriter(FragmentContext context,
- EasyWriter writer) throws
UnsupportedOperationException {
- throw new UnsupportedOperationException("unimplemented");
+ /**
+ * Define the output schema: the schema after type conversions.
+ * Does not include the special columns as those are added only when
+ * requested, and are always VARCHAR.
+ *
+ * @param capturingGroups the number of capturing groups in the regex
+ * (the number that return fields)
+ * @return a schema for the plugin config with names and types filled
+ * in. This schema drives type conversions if no schema is provided
+ * for the table
+ */
+
+ private TupleMetadata defineOutputSchemaFromConfig(int capturingGroups) {
+ List<String> fields = formatConfig.getFieldNames();
+ for (int i = fields.size(); i < capturingGroups; i++) {
+ fields.add("field_" + i);
+ }
+ SchemaBuilder builder = new SchemaBuilder();
+ for (int i = 0; i < capturingGroups; i++) {
+ makeColumn(builder, fields.get(i), i);
+ }
+ TupleMetadata schema = builder.buildSchema();
+
+ // Populate the date formats, if provided.
+
+ if (formatConfig.getSchema() == null) {
+ return schema;
+ }
+ for (int i = 0; i < formatConfig.getSchema().size(); i++) {
+ ColumnMetadata col = schema.metadata(i);
+ switch (col.type()) {
+ case DATE:
+ case TIMESTAMP:
+ case TIME:
+ break;
+ default:
+ continue;
+ }
+ String format = formatConfig.getDateFormat(i);
+ if (format == null) {
+ continue;
+ }
+ col.setProperty(ColumnMetadata.FORMAT_PROP, format);
+ }
+ return schema;
}
- @Override
- public int getReaderOperatorType() {
- return UserBitShared.CoreOperatorType.REGEX_SUB_SCAN_VALUE;
+ /**
+ * Build the output schema from the provided schema. Occurs in the case
+ * that the regex has more capturing groups than the schema has columns.
+ * Add all the provided columns, then add default fields for the
+ * remaining capturing groups.
+ *
+ * @param providedSchema the schema provided for the table
+ * @param capturingGroups the number of groups in the regex
+ * @return an output schema with the provided columns, plus default
+ * columns for any missing columns
+ */
+
+ private TupleMetadata defineOutputSchemaFromProvided(
+ TupleMetadata providedSchema, int capturingGroups) {
+ assert capturingGroups >= providedSchema.size();
+ SchemaBuilder builder = new SchemaBuilder();
+ for (int i = 0; i < providedSchema.size(); i++) {
+ builder.addColumn(providedSchema.metadata(i).copy());
+ }
+ for (int i = providedSchema.size(); i < capturingGroups; i++) {
+ builder.addNullable("field_" + i, MinorType.VARCHAR);
+ }
+ return builder.buildSchema();
}
- @Override
- public int getWriterOperatorType() {
- throw new UnsupportedOperationException("unimplemented");
+ /**
+ * Define the simplified reader schema: this is the format that the reader
+ * understands. All columns are VARCHAR, and the reader can offer the
+ * two special columns.
+ *
+ * @param outputSchema the output schema, defined above, with types
+ * filled in from the plugin config
+ * @return the reader schema: same column names, all columns of type
+ * VARCHAR (which is what a regex produces), with special columns added.
+ * The projection mechanism will pick all, some or none of these columns
+ * to project, then will take the desired output type from the output
+ * schema, providing any conversions needed
+ */
+
+ private void defineReaderSchema(SchemaBuilder builder, TupleMetadata
outputSchema) {
+ for (int i = 0; i < outputSchema.size(); i++) {
+ builder.addNullable(outputSchema.metadata(i).name(), MinorType.VARCHAR);
+ }
}
- @Override
- public boolean supportsStatistics() {
- return false;
+ private TupleMetadata buildSchema(SchemaBuilder builder) {
+ builder.addNullable(LogBatchReader.RAW_LINE_COL_NAME, MinorType.VARCHAR);
+ builder.addNullable(LogBatchReader.UNMATCHED_LINE_COL_NAME,
MinorType.VARCHAR);
+ TupleMetadata schema = builder.buildSchema();
+
+ // Exclude special columns from wildcard expansion
+
+ schema.metadata(LogBatchReader.RAW_LINE_COL_NAME).setBooleanProperty(
+ ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+ schema.metadata(LogBatchReader.UNMATCHED_LINE_COL_NAME).setBooleanProperty(
+ ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+
+ return schema;
}
- @Override
- public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path
statsTablePath) throws IOException {
- return null;
+ /**
+ * Determine the regex to use. Use the one from the REGEX_PROP table
+ * property of the provided schema, if available. Else, use the one from
+ * the plugin config. Then compile the regex. Issue an error if the
+ * pattern is bad.
+ */
+
+ private Pattern setupPattern(TupleMetadata providedSchema) {
+ String regex = formatConfig.getRegex();
+ if (providedSchema != null) {
+ regex = providedSchema.property(REGEX_PROP, regex);
+ }
+ if (Strings.isNullOrEmpty(regex)) {
+ throw UserException
+ .validationError()
+ .message("Regex property is required")
+ .addContext("Plugin", PLUGIN_NAME)
+ .build(logger);
+ }
+ try {
+ return Pattern.compile(regex);
+ } catch (PatternSyntaxException e) {
+ throw UserException
+ .validationError(e)
+ .message("Failed to parse regex: \"%s\"", regex)
+ .build(logger);
+ }
}
- @Override
- public void writeStatistics(DrillStatsTable.TableStatistics statistics,
FileSystem fs, Path statsTablePath) throws IOException {
+ private void makeColumn(SchemaBuilder builder, String name, int
patternIndex) {
+ String typeName = formatConfig.getDataType(patternIndex);
+ MinorType type;
+ if (Strings.isNullOrEmpty(typeName)) {
+ // No type name. VARCHAR is a safe guess
+ type = MinorType.VARCHAR;
+ } else {
+ type = MinorType.valueOf(typeName.toUpperCase());
+ }
+
+ // Verify supported types
+ switch (type) {
+ case VARCHAR:
+ case INT:
+ case SMALLINT:
+ case BIGINT:
+ case FLOAT4:
+ case FLOAT8:
+ case DATE:
+ case TIMESTAMP:
+ case TIME:
+ break;
+ default:
+ throw UserException
+ .validationError()
+ .message("Undefined column types")
+ .addContext("Position", patternIndex)
+ .addContext("Field name", name)
+ .addContext("Type", typeName)
+ .build(logger);
+ }
+ builder.addNullable(name, type);
+ }
+ public int maxErrors(TupleMetadata providedSchema) {
+ if (providedSchema == null) {
+ return formatConfig.getMaxErrors();
+ }
+ return providedSchema.intProperty(MAX_ERRORS_PROP,
+ formatConfig.getMaxErrors());
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
deleted file mode 100644
index c393b6f..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
+++ /dev/null
@@ -1,769 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.log;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableSmallIntVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableFloat8Vector;
-import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.NullableDateVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.NullableTimeStampVector;
-import org.apache.drill.exec.vector.NullableTimeVector;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-
-public class LogRecordReader extends AbstractRecordReader {
-
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
-
- private abstract static class ColumnDefn {
- private final String name;
- private final int index;
- private final String format;
-
- public ColumnDefn(String name, int index) {
- this(name, index, null);
- }
-
- public ColumnDefn(String name, int index, String format) {
- this.name = name;
- this.index = index;
- this.format = format;
- }
-
- public abstract void define(OutputMutator outputMutator) throws
SchemaChangeException;
-
- public abstract void load(int rowIndex, String value);
-
- public String getName() { return this.name; }
-
- public int getIndex() { return this.index; }
-
- public String getFormat() { return this.format;}
-
- @Override
- //For testing
- public String toString() {
- return "Name: " + name + ", Index: " + index + ", Format: " + format;
- }
- }
-
- private static class VarCharDefn extends ColumnDefn {
-
- private NullableVarCharVector.Mutator mutator;
-
- public VarCharDefn(String name, int index) {
- super(name, index);
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.VARCHAR));
- mutator = outputMutator.addField(field,
NullableVarCharVector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- byte[] bytes = value.getBytes();
- mutator.setSafe(rowIndex, bytes, 0, bytes.length);
- }
- }
-
- private static class BigIntDefn extends ColumnDefn {
-
- private NullableBigIntVector.Mutator mutator;
-
- public BigIntDefn(String name, int index) {
- super(name, index);
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.BIGINT));
- mutator = outputMutator.addField(field,
NullableBigIntVector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- mutator.setSafe(rowIndex, Long.parseLong(value));
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse an INT field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static class SmallIntDefn extends ColumnDefn {
-
- private NullableSmallIntVector.Mutator mutator;
-
- public SmallIntDefn(String name, int index) {
- super(name, index);
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.SMALLINT));
- mutator = outputMutator.addField(field,
NullableSmallIntVector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- mutator.setSafe(rowIndex, Short.parseShort(value));
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse an INT field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static class IntDefn extends ColumnDefn {
-
- private NullableIntVector.Mutator mutator;
-
- public IntDefn(String name, int index) {
- super(name, index);
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.INT));
- mutator = outputMutator.addField(field,
NullableIntVector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- mutator.setSafe(rowIndex, Integer.parseInt(value));
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse an INT field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static class Float4Defn extends ColumnDefn {
-
- private NullableFloat4Vector.Mutator mutator;
-
- public Float4Defn(String name, int index) {
- super(name, index);
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.FLOAT4));
- mutator = outputMutator.addField(field,
NullableFloat4Vector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- mutator.setSafe(rowIndex, Float.parseFloat(value));
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse an FLOAT field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static class DoubleDefn extends ColumnDefn {
-
- private NullableFloat8Vector.Mutator mutator;
-
- public DoubleDefn(String name, int index) {
- super(name, index);
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.FLOAT8));
- mutator = outputMutator.addField(field,
NullableFloat8Vector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- mutator.setSafe(rowIndex, Double.parseDouble(value));
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse an FLOAT field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static class DateDefn extends ColumnDefn {
-
- private NullableDateVector.Mutator mutator;
- private SimpleDateFormat df;
-
- public DateDefn(String name, int index, String dateFormat) {
- super(name, index, dateFormat);
- df = getValidDateObject(dateFormat);
- }
-
- private SimpleDateFormat getValidDateObject(String d) {
- SimpleDateFormat tempDateFormat;
- if (d != null && !d.isEmpty()) {
- tempDateFormat = new SimpleDateFormat(d);
- } else {
- throw UserException.parseError()
- .message("Invalid date format. The date formatting string was
empty.")
- .build(logger);
- }
- return tempDateFormat;
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.DATE));
- mutator = outputMutator.addField(field,
NullableDateVector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- Date d = df.parse(value);
- long milliseconds = d.getTime();
- mutator.setSafe(rowIndex, milliseconds);
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse an DATE field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- } catch (ParseException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Date Format String does not match field value.")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Format String", getFormat())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static class TimeDefn extends ColumnDefn {
-
- private NullableTimeVector.Mutator mutator;
- private SimpleDateFormat df;
-
- public TimeDefn(String name, int index, String dateFormat) {
- super(name, index, dateFormat);
- df = getValidDateObject(dateFormat);
- }
-
- private SimpleDateFormat getValidDateObject(String d) {
- SimpleDateFormat tempDateFormat;
- if (d != null && !d.isEmpty()) {
- tempDateFormat = new SimpleDateFormat(d);
- } else {
- throw UserException.parseError()
- .message("Invalid date format. The date formatting string was
empty.")
- .build(logger);
- }
- return tempDateFormat;
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.TIME));
- mutator = outputMutator.addField(field,
NullableTimeVector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- Date d = df.parse(value);
- int milliseconds = (int) d.getTime();
- mutator.setSafe(rowIndex, milliseconds);
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse an Time field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- } catch (ParseException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Date Format String does not match field value.")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Format String", getFormat())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static class TimeStampDefn extends ColumnDefn {
-
- private NullableTimeStampVector.Mutator mutator;
- private SimpleDateFormat df;
-
- public TimeStampDefn(String name, int index, String dateFormat) {
- super(name, index, dateFormat);
- df = getValidDateObject(dateFormat);
- }
-
- private SimpleDateFormat getValidDateObject(String d) {
- SimpleDateFormat tempDateFormat;
- if (d != null && !d.isEmpty()) {
- tempDateFormat = new SimpleDateFormat(d);
- } else {
- throw UserException.parseError()
- .message("Invalid date format. The date formatting string was
empty.")
- .build(logger);
- }
- return tempDateFormat;
- }
-
- @Override
- public void define(OutputMutator outputMutator) throws
SchemaChangeException {
- MaterializedField field = MaterializedField.create(getName(),
- Types.optional(MinorType.TIMESTAMP));
- mutator = outputMutator.addField(field,
NullableTimeStampVector.class).getMutator();
- }
-
- @Override
- public void load(int rowIndex, String value) {
- try {
- Date d = df.parse(value);
- long milliseconds = d.getTime();
- mutator.setSafe(rowIndex, milliseconds);
- } catch (NumberFormatException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Failed to parse a TIMESTAMP field")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Value", value)
- .build(logger);
- } catch (ParseException e) {
- throw UserException
- .dataReadError(e)
- .addContext("Date Format String does not match field value.")
- .addContext("Column", getName())
- .addContext("Position", getIndex())
- .addContext("Format String", getFormat())
- .addContext("Value", value)
- .build(logger);
- }
- }
- }
-
- private static final int BATCH_SIZE =
BaseValueVector.INITIAL_VALUE_ALLOCATION;
-
- private final DrillFileSystem dfs;
- private final FileWork fileWork;
- private final String userName;
- private final LogFormatConfig formatConfig;
- private ColumnDefn columns[];
- private Pattern pattern;
- private BufferedReader reader;
- private int rowIndex;
- private int capturingGroups;
- private OutputMutator outputMutator;
- private int unmatchedColumnIndex;
- private int unmatchedRowIndex;
- private boolean unmatchedRows;
- private int maxErrors;
-
-
- private int errorCount;
-
-
- public LogRecordReader(FragmentContext context, DrillFileSystem dfs,
- FileWork fileWork, List<SchemaPath> columns, String
userName,
- LogFormatConfig formatConfig) {
- this.dfs = dfs;
- this.fileWork = fileWork;
- this.userName = userName;
- this.formatConfig = formatConfig;
- this.unmatchedColumnIndex = -1;
- this.unmatchedRowIndex = 0;
- this.unmatchedRows = false;
- this.maxErrors = formatConfig.getMaxErrors();
-
- // Ask the superclass to parse the projection list.
- setColumns(columns);
-
- if (maxErrors < 0) {
- throw UserException
- .validationError()
- .message("Max Errors must be a positive integer greater than zero.")
- .build(logger);
- }
-
- }
-
- @Override
- public void setup(final OperatorContext context, final OutputMutator output)
{
- this.outputMutator = output;
-
- setupPattern();
- openFile();
- setupProjection();
- defineVectors();
- }
-
- private void setupPattern() {
- try {
- this.pattern = Pattern.compile(this.formatConfig.getRegex());
- Matcher m = pattern.matcher("test");
- capturingGroups = m.groupCount();
- } catch (PatternSyntaxException e) {
- throw UserException
- .validationError(e)
- .message("Failed to parse regex: \"%s\"", formatConfig.getRegex())
- .build(logger);
- }
- }
-
- private void setupProjection() {
- if (isSkipQuery()) {
- projectNone();
- } else if (isStarQuery()) {
- projectAll();
- } else {
- projectSubset();
- }
- }
-
- private void projectNone() {
- columns = new ColumnDefn[]{new VarCharDefn("dummy", -1)};
- }
-
- private void openFile() {
- InputStream in;
- try {
- in = dfs.open(fileWork.getPath());
- } catch (Exception e) {
- throw UserException
- .dataReadError(e)
- .message("Failed to open open input file: %s", fileWork.getPath())
- .addContext("User name", userName)
- .build(logger);
- }
- reader = new BufferedReader(new InputStreamReader(in, Charsets.UTF_8));
- }
-
- private void projectAll() {
- List<String> fields = formatConfig.getFieldNames();
- for (int i = fields.size(); i < capturingGroups; i++) {
- fields.add("field_" + i);
- }
- columns = new ColumnDefn[capturingGroups];
-
- for (int i = 0; i < capturingGroups; i++) {
- columns[i] = makeColumn(fields.get(i), i);
- }
- }
-
- private void projectSubset() {
- Collection<SchemaPath> project = this.getColumns();
- assert !project.isEmpty();
- columns = new ColumnDefn[project.size()];
-
- List<String> fields = formatConfig.getFieldNames();
- int colIndex = 0;
-
-
- for (SchemaPath column : project) {
- if (column.getAsNamePart().hasChild()) {
- throw UserException
- .validationError()
- .message("The log format plugin supports only simple columns")
- .addContext("Projected column", column.toString())
- .build(logger);
- }
-
- String name = column.getAsNamePart().getName();
-
- //Need this to retrieve unnamed fields
- Pattern r = Pattern.compile("^field_(\\d+)$");
- Matcher m = r.matcher(name);
- int patternIndex = -1;
-
- if (name.equals("_unmatched_rows")) {
- //Set boolean flag to true
- this.unmatchedRows = true;
- this.unmatchedColumnIndex = colIndex;
- } else if (m.find()) {
- //if no fields are defined in the configuration, then all the fields
have names of 'field_n'
- //Therefore n is the column index
- patternIndex = Integer.parseInt(m.group(1));
- } else {
- for (int i = 0; i < fields.size(); i++) {
- if (fields.get(i).equalsIgnoreCase(name) ||
- fields.get(i).equals("_raw") ||
- fields.get(i).equals("_unmatched_rows")
- ) {
- patternIndex = i;
-
- break;
- }
- }
- }
- columns[colIndex++] = makeColumn(name, patternIndex);
- }
-
- }
-
- private ColumnDefn makeColumn(String name, int patternIndex) {
- String typeName = null;
- if (patternIndex <= -1 || formatConfig.getSchema() == null) {
- // Use VARCHAR for missing columns
- // (instead of Drill standard of nullable int)
- typeName = MinorType.VARCHAR.name();
- } else if (patternIndex < formatConfig.getSchema().size()) {
- //typeName = formatConfig.getDataType(patternIndex);
- LogFormatField tempField = formatConfig.getField(patternIndex);
- typeName = tempField.getFieldType().toUpperCase();
- }
- if (typeName == null) {
- // No type name. VARCHAR is a safe guess
- typeName = MinorType.VARCHAR.name();
- }
- if (name.equals("_raw") || name.equals("_unmatched_rows")) {
- return new VarCharDefn(name, patternIndex);
- }
-
- MinorType type = MinorType.valueOf(typeName);
- //System.out.println( "Type name: " + typeName + " Type: " + type);
- switch (type) {
- case VARCHAR:
- return new VarCharDefn(name, patternIndex);
- case INT:
- return new IntDefn(name, patternIndex);
- case SMALLINT:
- return new SmallIntDefn(name, patternIndex);
- case BIGINT:
- return new BigIntDefn(name, patternIndex);
- case FLOAT4:
- return new Float4Defn(name, patternIndex);
- case FLOAT8:
- return new DoubleDefn(name, patternIndex);
- case DATE:
- return new DateDefn(name, patternIndex,
formatConfig.getDateFormat(patternIndex));
- case TIMESTAMP:
- return new TimeStampDefn(name, patternIndex,
formatConfig.getDateFormat(patternIndex));
- case TIME:
- return new TimeDefn(name, patternIndex,
formatConfig.getDateFormat(patternIndex));
- default:
- throw UserException
- .validationError()
- .message("Undefined column types")
- .addContext("Position", patternIndex)
- .addContext("Field name", name)
- .addContext("Type", typeName)
- .build(logger);
- }
- }
-
- private void defineVectors() {
- for (int i = 0; i < columns.length; i++) {
- try {
- columns[i].define(outputMutator);
- } catch (SchemaChangeException e) {
- throw UserException
- .systemError(e)
- .message("Vector creation failed")
- .build(logger);
- }
- }
- }
-
- @Override
- public int next() {
- rowIndex = 0;
- while (nextLine()) {
- }
- return rowIndex;
- }
-
- private boolean nextLine() {
- String line;
- try {
- line = reader.readLine();
- } catch (IOException e) {
- throw UserException
- .dataReadError(e)
- .message("Error reading file:")
- .addContext("File", fileWork.getPath())
- .build(logger);
- }
-
- if (line == null) {
- return false;
- }
- Matcher lineMatcher = pattern.matcher(line);
- if (lineMatcher.matches()) {
- loadVectors(lineMatcher);
- return rowIndex < BATCH_SIZE;
- }
-
- errorCount++;
- if (errorCount < maxErrors) {
- logger.warn("Unmatached line: {}", line);
- } else if (errorCount > maxErrors) {
- throw UserException.parseError()
- .message("Too many errors. Max error threshold exceeded.")
- .addContext("Line", line)
- .addContext("Line number", rowIndex)
- .build(logger);
- }
- //If the user asked for the unmatched columns display them
- if (unmatchedRows) {
- //If the user asked for the unmatched columns AND other columns
- if (columns.length > 1) {
- columns[unmatchedColumnIndex].load(rowIndex, line);
- rowIndex++;
- return rowIndex < BATCH_SIZE;
- } else {
- //If the user ONLY asked for the unmatched columns
- columns[unmatchedColumnIndex].load(unmatchedRowIndex, line);
- unmatchedRowIndex++;
- rowIndex = unmatchedRowIndex;
- return unmatchedRowIndex < BATCH_SIZE;
- }
- }
-
- return true;
- }
-
- private void loadVectors(Matcher m) {
- String value = null;
- /*if( unmatchedRows && columns.length == 1 ){
- return;
- }*/
-
- for (int i = 0; i < columns.length; i++) {
- //Skip the unmatched rows column
- if (columns[i].name.equals("_unmatched_rows")) {
- continue;
- }
-
- if (columns[i].index >= 0) {
- //Get the value of the regex group
- value = m.group(columns[i].index + 1);
-
- //If the value is not null, assign it to the column
- if (value != null) {
- columns[i].load(rowIndex, value);
- }
- } else if (columns[i].name.equals("_raw")) {
- //Special case. The first is if the query contains the _raw column
- value = m.group(0);
- if (value != null) {
- columns[i].load(rowIndex, value);
- } else {
- rowIndex++;
- }
- }
- }
- rowIndex++;
- }
-
- @Override
- public void close() {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- logger.warn("Error when closing file: " + fileWork.getPath(), e);
- }
- reader = null;
- }
- }
-
- @Override
- public String toString() {
- return "LogRecordReader[File=" + fileWork.getPath()
- + ", Line=" + rowIndex
- + "]";
- }
-}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
index 9110c55..802d633 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/README.md
@@ -1,8 +1,28 @@
# Drill Regex/Logfile Plugin
-Plugin for Apache Drill that allows Drill to read and query arbitrary files
where the schema can be defined by a regex. The original intent was for this
to be used for log files, however, it can be used for any structured data.
-## Example Use Case: MySQL Log
-If you wanted to analyze log files such as the MySQL log sample shown below
using Drill, it may be possible using various string fucntions, or you could
write a UDF specific to this data however, this is time consuming, difficult
and not reusable.
+Plugin for Apache Drill that allows Drill to read and query arbitrary files
+where the schema can be defined by a regex. The original intent was for this
+to be used for log files, however, it can be used for any structured data.
+
+The plugin provides multiple ways to define the regex:
+
+* In the format plugin configuration
+* Via Drill's Provided Schema feature
+* Via a Table Function
+
+The plugin provides multiple ways to associate a schema with your regex:
+
+* In the format plugin configuration
+* Via Drill's Provided Schema feature
+* Implicit schema: columns are returned as a `columns` array
+(Handy when using table functions to provide the regex.)
+
+## Example Use Case: MySQL Log
+
+If you wanted to analyze log files such as the MySQL log sample shown below
+using Drill, it may be possible using various string functions, or you could
+write a UDF specific to this data however, this is time consuming, difficult
+and not reusable.
```
070823 21:00:32 1 Connect root@localhost on test1
@@ -11,26 +31,66 @@ If you wanted to analyze log files such as the MySQL log
sample shown below usin
070917 16:29:01 21 Query select * from location
070917 16:29:12 21 Query select * from location where id = 1 LIMIT 1
```
-This plugin will allow you to configure Drill to directly query logfiles of
any configuration.
+
+Using this plugin, you can configure Drill to directly query log files of
+any configuration.
## Configuration Options
-* **`type`**: This tells Drill which extension to use. In this case, it must
be `logRegex`. This field is mandatory.
-* **`regex`**: This is the regular expression which defines how the log file
lines will be split. You must enclose the parts of the regex in grouping
parentheses that you wish to extract. Note that this plugin uses Java regular
expressions and requires that shortcuts such as `\d` have an additional slash:
ie `\\d`. This field is mandatory.
-* **`extension`**: This option tells Drill which file extensions should be
mapped to this configuration. Note that you can have multiple configurations
of this plugin to allow you to query various log files. This field is
mandatory.
-* **`maxErrors`**: Log files can be inconsistent and messy. The `maxErrors`
variable allows you to set how many errors the reader will ignore before
halting execution and throwing an error. Defaults to 10.
-* **`schema`**: The `schema` field is where you define the structure of the
log file. This section is optional. If you do not define a schema, all fields
will be assigned a column name of `field_n` where `n` is the index of the
field. The undefined fields will be assigned a default data type of `VARCHAR`.
+
+* **type**: Must be `logRegex`.
+* **regex**: The regular expression which defines how the log file
+lines will be split. You must enclose the parts of the regex in grouping
+parentheses that you wish to extract. Note that this plugin uses Java regular
+expressions and requires that shortcuts such as `\d` have an additional slash:
+ie `\\d`. Required.
+* **extension**: File extensions to be
+mapped to this configuration. Note that you can have multiple configurations
+of this plugin to allow you to query various log files. Required.
+* **maxErrors**: Log files can be inconsistent and messy. Specifies the
+number of errors the reader will ignore before
+halting execution with an error. Defaults to 10.
+* **schema**: Defines the structure of
+the log file. Optional. If you do not define a schema, all
+fields will be assigned a column name of `field_n` where `n` is the index
+of the field. The undefined fields will be assigned a default data type of
+`VARCHAR`. You typically should define a schema column for each group of
+your regex.
### Defining a Schema
-The schema variable is an JSON array of fields which have at the moment, three
possible variables:
-* **`fieldName`**: This is the name of the field.
-* **`fieldType`**: Defines the data type. Defaults to `VARCHAR` if
undefined. At the time of writing, the reader supports: `VARCHAR`, `INT`,
`SMALLINT`, `BIGINT`, `FLOAT4`, `FLOAT8`, `DATE`, `TIMESTAMP`, `TIME`.
-* **`format`**: Defines the for date/time fields. This is mandatory if the
field is a date/time field.
-In the future, it is my hope that the schema section will allow for data
masking, validation and other transformations that are commonly used for
analysis of log files.
+The easiest way to use the plugin is simply to define the regex only. In this
case,
+the plugin, like the text plugin, will place all matching fields into a single
+column: the `columns` array column.
-### Example Configuration:
-The configuration below demonstrates how to configure Drill to query the
example MySQL log file shown above.
+You can also define a schema consisting of column names and optionally column
+types.
+The schema variable is an JSON array of fields which have three attributes:
+
+* **fieldName**: Field name. Must be a valid Drill field name and must be
+unique. Required.
+* **fieldType**: Field data type. Defaults to `VARCHAR` if undefined.
+The reader supports: `VARCHAR`, `INT`, `SMALLINT`,
+`BIGINT`, `FLOAT4`, `FLOAT8`, `DATE`, `TIMESTAMP`, `TIME`. (See note
+below about using `CREATE SCHEMA` as an alternative way to specify types.)
+* **format**: Format for date/time fields. Defaults to ISO format.
+
+If you provide at least one field, but fewer than the number of regex
+groups, then Drill will extract
+all fields and give them the name `field_n`. The fields are indexed from
+`0`. Therefore if you have a dataset with 5 fields and you have named the
+first two `first` and `second`, then the following query would
+be valid:
+
+```
+SELECT first, second, field_2, field_3, field_4
+FROM ..
+```
+
+### Example Configuration
+
+The configuration below demonstrates how to configure Drill to query the
example
+MySQL log file shown above.
```
"log" : {
@@ -63,24 +123,169 @@ The configuration below demonstrates how to configure
Drill to query the example
}
```
-
## Example Usage
-This format plugin gives you two options for querieng fields. If you define
the fields, you can query them as you would any other data source. If you do
nof define a field in the column `schema` variable, Drill will extract all
fields and give them the name `field_n`. The fields are indexed from `0`.
Therefore if you have a dataset with 5 fields the following query would be
valid:
+This format plugin gives you two options for querying fields. If you define
+the fields, you can query them as you would any other data source.:
```
-SELECT field_0, field_1, field_2, field_3, field_4
+SELECT eventDate, eventTime, PID
FROM ..
```
-### Implicit Fields
-In addition to the fields which the user defines, the format plugin has two
implicit fields whcih can be useful for debugging your regex. These fields do
not appear in `SELECT *` queries and only will be retrieved when included in a
query.
+If you define a regex, but no field schema, then use either the wildcard
+or `columns`:
+
+```
+SELECT * FROM ...
+SELECT columns FROM ...
+```
+
+Without a schema, you can select specific columns as in the text plugin:
+specify array indexes:
+
+```
+SELECT columns[0], columns[2] FROM ...
+```
+
+## Implicit Fields
+
+In addition to the fields which the user defines, the format plugin has two
+implicit fields which can be useful for debugging your regex. These fields
+do not appear in wildcard (`SELECT *`) queries. They are included only via
+an explicit projection: `SELECT _raw, _unmatched_rows, ...`.
+
+* **_raw**: Returns the input line matched by your regex.
+* **_unmatched_rows**: Returns input lines which *did not* match the regex.
+For example, if you have
+a data file of 10 lines, 8 of which match, `SELECT _unmatched_rows` will return
+two rows. If however, you combine this with another field, such as `_raw`, the
+`_unmatched_rows` will be `null` when the rows match and have a value when it
+does not.
+
+This plugin also supports Drill's standard implicit file and partition columns.
+
+## Provided Schema
+
+Drill 1.16 introduced the `CREATE SCHEMA` command to allow you to define the
+schema for your table. This plugin was created earlier. Here is how the two
schema
+systems interact.
+
+### Plugin Config Provides Regex and Field Names
+
+The first way to use the provided schema is just to define column types.
+In this use case, the plugin config provides the physical layout (pattern
+and column names), the provided schema provides data types and default
+values (for missing columns.)
+
+In this case:
+
+* The plugin config must provide the regex.
+* The plugin config provides the list of column names. (If not provided,
+the names will be `field_1`, `field_2`, etc.)
+* The plugin config should not provide column types.
+* The table provides a schema via `CREATE SCHEMA`. Column names
+in the schema must match those in the plugin config by name. The types in the
+provided schema are used instead of those specified in the plugin config. The
schema
+allows you to specify the data type, and either nullable or `not null`
+cardinality.
+
+### Including the Regex in the Provided Schema
+
+Another way to use the provided schema is to define an empty plugin config;
don't
+even provide the regex. Use table properties to define the regex (and the
maximum
+error count, if desired.)
+
+In this case:
+
+* Set the table property `drill.logRegex.regex` to the desired pattern.
+* Optionally set the table property `drill.logRegex.maxErrors` to the maximum
+error count.
+* Define columns names and types via the provided schema. Columns must
+appear in the same order as the groups in the regex (they are matched by
+position.)
+* If you have more groups than columns, Drill will fill in the missing
+columns as `field_n`, of type `VARCHAR` as described above.
+
+Example `CREATE SCHEMA` statement:
+
+```
+CREATE SCHEMA (
+ `year` int not null,
+ `month` int not null,
+ `day` int not null)
+FOR TABLE dfs.example.myTable
+PROPERTIES (
+ 'drill.logRegex.regex'='(\d\d\d\d)-(\d\d)-(\d\d) .*',
+ 'drill.logRegex.maxErrors'='10')
+```
+
+Use this model if you have a large number of log files with many different
+formats. You can store the format information in the table directory so that
+you don't have to create a plugin config for each.
+
+Optionally, you can create a workspace for each kind of log (which must
+be stored in distinct directory trees), and make your "blank" log format
+config the default config for that workspace.
-* **`_raw`**: This field returns the complete lines which matched your regex.
-* **`_unmatched_rows`**: This field returns rows which **did not** match the
regex. Note: This field ONLY returns the unmatching rows, so if you have a
data file of 10 lines, 8 of which match, `SELECT _unmatched_rows` will return 2
rows. If however, you combine this with another field, such as `_raw`, the
`_unmatched_rows` will be `null` when the rows match and have a value when it
does not.
+## Table Functions
+Log files come in many forms. To quickly query a new format, if you do not
+have a config, you can instead use a
+[table
function](https://drill.apache.org/docs/plugin-configuration-basics/#using-the-formats-attributes-as-table-function-parameters).
+### Plugin Config Table Function
+You can use a table function to specify the same properties that you can set
in the
+plugin config.
+
+Example:
+
+```
+SELECT * FROM table(dfs.example.myTable (
+ type => 'logRegex',
+ regex => '(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*',
+ maxErrors => 10))
+```
+
+The fields match those in the format configuration as shown above.
+
+* **type**: Must be `'logRegex'`. Required. Must be the first parameter.
+* **regex**: The regular expression. You must escape back-slashes with a second
+back-slash as shown in the example. (If you need to use a single-quote, escape
+it with a second single-quote.) Required.
+* **maxErrors**: As described above for the format plugin configuration.
+
+In this mode, you cannot specify a schema (table functions do not support a
complex type
+such as the `schema` field.) Instead, fields will appear in the `columns`
array as
+explained earlier, and will have type `VARCHAR`.
+
+Nor can you use the `schema` property to specify an in-line provisioned schema
since,
+unfortunately, the `schema` config property conflicts with the `schema` table
property
+field.
+
+### Schema Table Function
+
+The other table function option is to use a combination of a schema table
function
+and a plugin config. The plugin config must at least specify the file
extension so that
+Drill knows what config to use. See
+[DRILL-6965](https://issues.apache.org/jira/browse/DRILL-6965) for details.
+
+Example:
+
+```
+SELECT * FROM table(dfs.tf.table2(
+ schema=>'inline=(`year` int, `month` int, `day` int) properties
{`drill.logRegex.regex`=`(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*`}'))
+```
+The quoted string for the `schema` value is the same as the SQL you would
+use to create a provided schema. The regex must be quoted as described earlier.
+Note the use of back-tick quotes to quote strings inside the quoted schema
+value.
+## Note to Developers
+This plugin is a variation of the one discussed in the book
+["Learning Apache
Drill"](https://www.amazon.com/Learning-Apache-Drill-Analyze-Distributed/dp/1492032794),
+Chapter 12, *Writing a Format Plug-in*. This plugin is also the subject of a
+[tutorial on the "Enhanced Vector
Framework"](https://github.com/paul-rogers/drill/wiki/Developer%27s-Guide-to-the-Enhanced-Vector-Framework).
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
index f40a84e..f478c03 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/TestSchemaWithTableFunction.java
@@ -17,6 +17,14 @@
*/
package org.apache.drill;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.util.StoragePluginTestUtils;
@@ -28,14 +36,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.io.File;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class TestSchemaWithTableFunction extends ClusterTest {
private static final String DATA_PATH = "store/text/data";
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
index babe3ab..e4da3bc 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArray.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.MockScanBuilder;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
import
org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
import
org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager.FileMetadataOptions;
import
org.apache.drill.exec.physical.impl.scan.project.ReaderSchemaOrchestrator;
@@ -102,7 +103,7 @@ public class TestColumnsArray extends SubOperatorTest {
// Table schema (columns: VARCHAR[])
TupleMetadata tableSchema = new SchemaBuilder()
- .addArray(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
+ .addArray(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema();
mock.loader = mock.reader.makeTableLoader(tableSchema);
@@ -122,7 +123,7 @@ public class TestColumnsArray extends SubOperatorTest {
public void testColumnsArray() {
MockScanner mock =
buildScanner(RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
- ColumnsArrayManager.COLUMNS_COL,
+ ColumnsScanFramework.COLUMNS_COL,
ScanTestUtils.partitionColName(0)));
// Verify empty batch.
@@ -276,7 +277,7 @@ public class TestColumnsArray extends SubOperatorTest {
@Test
public void testMissingColumnsColumn() {
ScanSchemaOrchestrator scanner = buildScan(true,
- RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
+ RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL));
TupleMetadata tableSchema = new SchemaBuilder()
.add("a", MinorType.VARCHAR)
@@ -297,10 +298,10 @@ public class TestColumnsArray extends SubOperatorTest {
@Test
public void testNotRepeated() {
ScanSchemaOrchestrator scanner = buildScan(true,
- RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
+ RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL));
TupleMetadata tableSchema = new SchemaBuilder()
- .add(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
+ .add(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema();
try {
@@ -322,10 +323,10 @@ public class TestColumnsArray extends SubOperatorTest {
@Test
public void testReqularCol() {
ScanSchemaOrchestrator scanner = buildScan(false,
- RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
+ RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL));
TupleMetadata tableSchema = new SchemaBuilder()
- .add(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
+ .add(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema();
ReaderSchemaOrchestrator reader = scanner.startReader();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
index 6643b3b..37f09a2 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayFramework.java
@@ -17,12 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.scan;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -33,7 +27,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixture;
import
org.apache.drill.exec.physical.impl.scan.ScanTestUtils.ScanFixtureBuilder;
import
org.apache.drill.exec.physical.impl.scan.TestFileScanFramework.DummyFileWork;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
import
org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
import
org.apache.drill.exec.physical.impl.scan.columns.ColumnsSchemaNegotiator;
import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
@@ -51,6 +45,12 @@ import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
/**
* Test the columns-array specific behavior in the columns scan framework.
@@ -186,13 +186,13 @@ public class TestColumnsArrayFramework extends
SubOperatorTest {
DummyColumnsReader reader = new DummyColumnsReader(
new SchemaBuilder()
- .addArray(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
+ .addArray(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema());
// Create the scan operator
ColumnsScanFixtureBuilder builder = new ColumnsScanFixtureBuilder();
-
builder.setProjection(RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL));
+
builder.setProjection(RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL));
builder.addReader(reader);
builder.builder.requireColumnsArray(true);
ScanFixture scanFixture = builder.build();
@@ -220,15 +220,15 @@ public class TestColumnsArrayFramework extends
SubOperatorTest {
DummyColumnsReader reader = new DummyColumnsReader(
new SchemaBuilder()
- .addArray(ColumnsArrayManager.COLUMNS_COL, MinorType.VARCHAR)
+ .addArray(ColumnsScanFramework.COLUMNS_COL, MinorType.VARCHAR)
.buildSchema());
// Create the scan operator
ColumnsScanFixtureBuilder builder = new ColumnsScanFixtureBuilder();
builder.setProjection(Lists.newArrayList(
- SchemaPath.parseFromString(ColumnsArrayManager.COLUMNS_COL + "[1]"),
- SchemaPath.parseFromString(ColumnsArrayManager.COLUMNS_COL + "[3]")));
+ SchemaPath.parseFromString(ColumnsScanFramework.COLUMNS_COL + "[1]"),
+ SchemaPath.parseFromString(ColumnsScanFramework.COLUMNS_COL + "[3]")));
builder.addReader(reader);
builder.builder.requireColumnsArray(true);
ScanFixture scanFixture = builder.build();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
index 4429853..7e32869 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestColumnsArrayParser.java
@@ -26,8 +26,8 @@ import static org.junit.Assert.fail;
import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayManager;
import org.apache.drill.exec.physical.impl.scan.columns.ColumnsArrayParser;
+import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
import
org.apache.drill.exec.physical.impl.scan.columns.UnresolvedColumnsArrayColumn;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataColumn;
import org.apache.drill.exec.physical.impl.scan.file.FileMetadataManager;
@@ -52,14 +52,14 @@ public class TestColumnsArrayParser extends SubOperatorTest
{
@Test
public void testColumnsArray() {
ScanLevelProjection scanProj = ScanLevelProjection.build(
- RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL),
+ RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL),
ScanTestUtils.parsers(new ColumnsArrayParser(true)));
assertFalse(scanProj.projectAll());
assertEquals(1, scanProj.requestedCols().size());
assertEquals(1, scanProj.columns().size());
- assertEquals(ColumnsArrayManager.COLUMNS_COL,
scanProj.columns().get(0).name());
+ assertEquals(ColumnsScanFramework.COLUMNS_COL,
scanProj.columns().get(0).name());
// Verify column type
@@ -69,14 +69,14 @@ public class TestColumnsArrayParser extends SubOperatorTest
{
@Test
public void testRequiredColumnsArray() {
ScanLevelProjection scanProj = ScanLevelProjection.build(
- RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL),
+ RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL),
ScanTestUtils.parsers(new ColumnsArrayParser(true)));
assertFalse(scanProj.projectAll());
assertEquals(1, scanProj.requestedCols().size());
assertEquals(1, scanProj.columns().size());
- assertEquals(ColumnsArrayManager.COLUMNS_COL,
scanProj.columns().get(0).name());
+ assertEquals(ColumnsScanFramework.COLUMNS_COL,
scanProj.columns().get(0).name());
// Verify column type
@@ -93,7 +93,7 @@ public class TestColumnsArrayParser extends SubOperatorTest {
assertEquals(1, scanProj.requestedCols().size());
assertEquals(1, scanProj.columns().size());
- assertEquals(ColumnsArrayManager.COLUMNS_COL,
scanProj.columns().get(0).name());
+ assertEquals(ColumnsScanFramework.COLUMNS_COL,
scanProj.columns().get(0).name());
// Verify column type
@@ -125,15 +125,15 @@ public class TestColumnsArrayParser extends
SubOperatorTest {
ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectList(
- ColumnsArrayManager.COLUMNS_COL + "[3]",
- ColumnsArrayManager.COLUMNS_COL + "[1]"),
+ ColumnsScanFramework.COLUMNS_COL + "[3]",
+ ColumnsScanFramework.COLUMNS_COL + "[1]"),
ScanTestUtils.parsers(new ColumnsArrayParser(true)));
assertFalse(scanProj.projectAll());
assertEquals(2, scanProj.requestedCols().size());
assertEquals(1, scanProj.columns().size());
- assertEquals(ColumnsArrayManager.COLUMNS_COL,
scanProj.columns().get(0).name());
+ assertEquals(ColumnsScanFramework.COLUMNS_COL,
scanProj.columns().get(0).name());
// Verify column type
@@ -159,7 +159,7 @@ public class TestColumnsArrayParser extends SubOperatorTest
{
public void testErrorColumnsArrayAndColumn() {
try {
ScanLevelProjection.build(
- RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL, "a"),
+ RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL, "a"),
ScanTestUtils.parsers(new ColumnsArrayParser(true)));
fail();
} catch (UserException e) {
@@ -175,7 +175,7 @@ public class TestColumnsArrayParser extends SubOperatorTest
{
public void testErrorColumnAndColumnsArray() {
try {
ScanLevelProjection.build(
- RowSetTestUtils.projectList("a", ColumnsArrayManager.COLUMNS_COL),
+ RowSetTestUtils.projectList("a", ColumnsScanFramework.COLUMNS_COL),
ScanTestUtils.parsers(new ColumnsArrayParser(true)));
fail();
} catch (UserException e) {
@@ -191,7 +191,7 @@ public class TestColumnsArrayParser extends SubOperatorTest
{
public void testErrorTwoColumnsArray() {
try {
ScanLevelProjection.build(
- RowSetTestUtils.projectList(ColumnsArrayManager.COLUMNS_COL,
ColumnsArrayManager.COLUMNS_COL),
+ RowSetTestUtils.projectList(ColumnsScanFramework.COLUMNS_COL,
ColumnsScanFramework.COLUMNS_COL),
ScanTestUtils.parsers(new ColumnsArrayParser(false)));
fail();
} catch (UserException e) {
@@ -245,7 +245,7 @@ public class TestColumnsArrayParser extends SubOperatorTest
{
ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectList(ScanTestUtils.FILE_NAME_COL,
- ColumnsArrayManager.COLUMNS_COL,
+ ColumnsScanFramework.COLUMNS_COL,
ScanTestUtils.SUFFIX_COL),
ScanTestUtils.parsers(new ColumnsArrayParser(true),
metadataManager.projectionParser()));
@@ -255,7 +255,7 @@ public class TestColumnsArrayParser extends SubOperatorTest
{
assertEquals(3, scanProj.columns().size());
assertEquals(ScanTestUtils.FILE_NAME_COL,
scanProj.columns().get(0).name());
- assertEquals(ColumnsArrayManager.COLUMNS_COL,
scanProj.columns().get(1).name());
+ assertEquals(ColumnsScanFramework.COLUMNS_COL,
scanProj.columns().get(1).name());
assertEquals(ScanTestUtils.SUFFIX_COL, scanProj.columns().get(2).name());
// Verify column type
@@ -280,14 +280,14 @@ public class TestColumnsArrayParser extends
SubOperatorTest {
ScanLevelProjection scanProj = ScanLevelProjection.build(
RowSetTestUtils.projectList(
SchemaPath.DYNAMIC_STAR,
- ColumnsArrayManager.COLUMNS_COL),
+ ColumnsScanFramework.COLUMNS_COL),
ScanTestUtils.parsers(new ColumnsArrayParser(true)));
assertFalse(scanProj.projectAll());
assertEquals(2, scanProj.requestedCols().size());
assertEquals(1, scanProj.columns().size());
- assertEquals(ColumnsArrayManager.COLUMNS_COL,
scanProj.columns().get(0).name());
+ assertEquals(ColumnsScanFramework.COLUMNS_COL,
scanProj.columns().get(0).name());
// Verify column type
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
index 909e3de..ba93ce6 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/log/TestLogReader.java
@@ -17,16 +17,18 @@
*/
package org.apache.drill.exec.store.log;
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -34,12 +36,21 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryRowSetIterator;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Ignore;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+@Category(RowSetTests.class)
public class TestLogReader extends ClusterTest {
public static final String DATE_ONLY_PATTERN =
"(\\d\\d\\d\\d)-(\\d\\d)-(\\d\\d) .*";
@@ -47,6 +58,11 @@ public class TestLogReader extends ClusterTest {
@ClassRule
public static final BaseDirTestWatcher dirTestWatcher = new
BaseDirTestWatcher();
+ protected static File schemaAndConfigDir;
+ protected static File schemaOnlyDir;
+
+ private static File tableFuncDir;
+
@BeforeClass
public static void setup() throws Exception {
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
@@ -68,7 +84,7 @@ public class TestLogReader extends ClusterTest {
sampleConfig.setRegex(DATE_ONLY_PATTERN);
sampleConfig.setSchema();
- sampleConfig.getSchema().add( new LogFormatField("year","INT"));
+ sampleConfig.getSchema().add( new LogFormatField("year", "INT"));
sampleConfig.getSchema().add( new LogFormatField("month", "INT"));
sampleConfig.getSchema().add( new LogFormatField("day", "INT"));
@@ -81,12 +97,12 @@ public class TestLogReader extends ClusterTest {
"\\[([^]]*)] (\\w+)\\s+(\\S+) - (.*)");
logConfig.setSchema();
- logConfig.getSchema().add( new LogFormatField("year","INT"));
- logConfig.getSchema().add( new LogFormatField("month","INT"));
- logConfig.getSchema().add( new LogFormatField("day","INT"));
- logConfig.getSchema().add( new LogFormatField("hour","INT"));
- logConfig.getSchema().add( new LogFormatField("minute","INT"));
- logConfig.getSchema().add( new LogFormatField("second","INT"));
+ logConfig.getSchema().add( new LogFormatField("year", "INT"));
+ logConfig.getSchema().add( new LogFormatField("month", "INT"));
+ logConfig.getSchema().add( new LogFormatField("day", "INT"));
+ logConfig.getSchema().add( new LogFormatField("hour", "INT"));
+ logConfig.getSchema().add( new LogFormatField("minute", "INT"));
+ logConfig.getSchema().add( new LogFormatField("second", "INT"));
logConfig.getSchema().add( new LogFormatField("thread"));
logConfig.getSchema().add( new LogFormatField("level"));
logConfig.getSchema().add( new LogFormatField("module"));
@@ -120,6 +136,32 @@ public class TestLogReader extends ClusterTest {
pluginConfig.getFormats().put("date-log",logDateConfig);
pluginConfig.getFormats().put( "mysql-log", mysqlLogConfig);
pluginRegistry.createOrUpdate("cp", pluginConfig, false);
+
+ // Config similar to the above, but with no type info. Types
+ // will be provided via the provided schema mechanism. Column names
+ // are required so that the format and provided schemas match up.
+
+ LogFormatConfig untypedConfig = new LogFormatConfig();
+
+ untypedConfig.setExtension("logu");
+ untypedConfig.setRegex(DATE_ONLY_PATTERN);
+
+ untypedConfig.setSchema();
+ untypedConfig.getSchema().add( new LogFormatField("year"));
+ untypedConfig.getSchema().add( new LogFormatField("month"));
+ untypedConfig.getSchema().add( new LogFormatField("day"));
+
+ // Create a test directory we can write to.
+
+ schemaAndConfigDir = cluster.makeDataDir("sAndC", "logu", untypedConfig);
+
+ // Empty configuration: regex and columns defined in the
+ // provided schema
+
+ LogFormatConfig emptyConfig = new LogFormatConfig();
+ emptyConfig.setExtension("loge");
+ schemaOnlyDir = cluster.makeDataDir("SOnly", "loge", emptyConfig);
+ tableFuncDir = cluster.makeDataDir("tf", "logf", emptyConfig);
}
@Test
@@ -142,18 +184,21 @@ public class TestLogReader extends ClusterTest {
RowSetUtilities.verify(expected, results);
}
+ /**
+ * Tests for no crashes; does not validate results, unfortunately.
+ */
@Test
public void testWildcardLargeFile() throws RpcException {
String sql = "SELECT * FROM cp.`regex/large.log1`";
- List<QueryDataBatch> batches = client.queryBuilder().sql(sql).results();
+ QueryRowSetIterator iter = client.queryBuilder().sql(sql).rowSetIterator();
- for (QueryDataBatch queryDataBatch : batches) {
- queryDataBatch.release();
+ for (RowSet rowSet : iter) {
+ rowSet.clear();
}
}
@Test
- public void testExplicit() throws RpcException {
+ public void testExplicitProject() throws RpcException {
String sql = "SELECT `day`, `month` FROM cp.`regex/simple.log1`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
@@ -172,7 +217,7 @@ public class TestLogReader extends ClusterTest {
}
@Test
- public void testMissing() throws RpcException {
+ public void testMissingColumns() throws RpcException {
String sql = "SELECT `day`, `missing`, `month` FROM
cp.`regex/simple.log1`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
@@ -251,52 +296,25 @@ public class TestLogReader extends ClusterTest {
RowSetUtilities.verify(expected, results);
}
- //This section tests log queries without a defined schema
+ /**
+ * Test log queries without a defined schema using select *
+ */
+
@Test
public void testStarQueryNoSchema() throws RpcException {
String sql = "SELECT * FROM cp.`regex/mysql.sqllog`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .addNullable("field_0", MinorType.VARCHAR)
- .addNullable("field_1", MinorType.VARCHAR)
- .addNullable("field_2", MinorType.VARCHAR)
- .addNullable("field_3", MinorType.VARCHAR)
- .addNullable("field_4", MinorType.VARCHAR)
- .buildSchema();
-
- RowSet expected = client.rowSetBuilder(expectedSchema)
- .addRow("070823", "21:00:32", "1", "Connect", "root@localhost on
test1")
- .addRow("070823", "21:00:48", "1", "Query", "show tables")
- .addRow("070823", "21:00:56", "1", "Query", "select * from category" )
- .addRow("070917", "16:29:01", "21", "Query","select * from location" )
- .addRow("070917", "16:29:12", "21", "Query","select * from location
where id = 1 LIMIT 1" )
- .build();
-
- //results.print();
- //expected.print();
- RowSetUtilities.verify(expected, results);
- }
-
- @Test
- public void testAllFieldsQueryNoSchema() throws RpcException {
- String sql = "SELECT field_0, field_1, field_2, field_3, field_4 FROM
cp.`regex/mysql.sqllog`";
- RowSet results = client.queryBuilder().sql(sql).rowSet();
-
- TupleMetadata expectedSchema = new SchemaBuilder()
- .addNullable("field_0", MinorType.VARCHAR)
- .addNullable("field_1", MinorType.VARCHAR)
- .addNullable("field_2", MinorType.VARCHAR)
- .addNullable("field_3", MinorType.VARCHAR)
- .addNullable("field_4", MinorType.VARCHAR)
+ .addArray("columns", MinorType.VARCHAR)
.buildSchema();
RowSet expected = client.rowSetBuilder(expectedSchema)
- .addRow("070823", "21:00:32", "1", "Connect", "root@localhost on
test1")
- .addRow("070823", "21:00:48", "1", "Query", "show tables")
- .addRow("070823", "21:00:56", "1", "Query", "select * from category" )
- .addRow("070917", "16:29:01", "21", "Query","select * from location" )
- .addRow("070917", "16:29:12", "21", "Query","select * from location
where id = 1 LIMIT 1" )
+ .addSingleCol(strArray("070823", "21:00:32", "1", "Connect",
"root@localhost on test1"))
+ .addSingleCol(strArray("070823", "21:00:48", "1", "Query", "show
tables"))
+ .addSingleCol(strArray("070823", "21:00:56", "1", "Query", "select *
from category" ))
+ .addSingleCol(strArray("070917", "16:29:01", "21", "Query","select *
from location" ))
+ .addSingleCol(strArray("070917", "16:29:12", "21", "Query","select *
from location where id = 1 LIMIT 1" ))
.build();
RowSetUtilities.verify(expected, results);
@@ -304,12 +322,12 @@ public class TestLogReader extends ClusterTest {
@Test
public void testSomeFieldsQueryNoSchema() throws RpcException {
- String sql = "SELECT field_0, field_4 FROM cp.`regex/mysql.sqllog`";
+ String sql = "SELECT columns[0], columns[4] FROM cp.`regex/mysql.sqllog`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .addNullable("field_0", MinorType.VARCHAR)
- .addNullable("field_4", MinorType.VARCHAR)
+ .addNullable("EXPR$0", MinorType.VARCHAR)
+ .addNullable("EXPR$1", MinorType.VARCHAR)
.buildSchema();
RowSet expected = client.rowSetBuilder(expectedSchema)
@@ -381,4 +399,341 @@ public class TestLogReader extends ClusterTest {
RowSetUtilities.verify(expected, results);
}
-}
\ No newline at end of file
+
+ /**
+ * Build a table, temporary for this test, using the table name and resource
+ * provided.
+ *
+ * @param tableName name of the table within the test-temporary dfs.data
+ * workspace
+ * @param fileName name of the one and only file in the table (allows using
+ * plugin-specific extensions)
+ * @param resource path to an existing resource file to copy into the
+ * table as the given file name
+ * @return the SQL path for the table
+ * @throws IOException if the file operations fail
+ */
+
+ private String buildTable(File dir, String ws, String tableName,
+ String fileName, String resource) throws IOException {
+
+ // We need to create a schema file. Create a temporary test
+ // table.
+
+ File tableDir = new File(dir, tableName);
+ tableDir.mkdirs();
+
+ // Copy the "simple.log1" data file. Use a distinct extension
+ // configured above to provide field names but no types.
+
+ File dest = new File(tableDir, fileName);
+ URL url = getClass().getResource(resource);
+ FileUtils.copyURLToFile(url, dest);
+ return "dfs." + ws + "." + tableName;
+ }
+
+ @Test
+ public void testProvidedSchema() throws Exception {
+ String tablePath = buildTable(schemaAndConfigDir, "sAndC", "schema",
+ "sample.logu", "/regex/simple.log1");
+ try {
+
+ // Define the provided table schema
+
+ client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
+ String schemaSql = "create schema (`year` int not null, `month` int not
null, " +
+ "`day` int not null) " +
+ "for table " + tablePath;
+ run(schemaSql);
+
+ // Run a query using the provided schema.
+
+ String sql = "SELECT * FROM %s";
+ RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();
+
+ // Verify that the returned data used the schema.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("year", MinorType.INT)
+ .add("month", MinorType.INT)
+ .add("day", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(2017, 12, 17)
+ .addRow(2017, 12, 18)
+ .addRow(2017, 12, 19)
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ } finally {
+ client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
+ }
+ }
+
+ /**
+ * Test the case in which the plugin config contains no information
+ * other than the file extensions. The regex is provided by the provided
+ * schema, but no columns are defined, so we use the columns[] array.
+ */
+
+ @Test
+ public void testSchemaOnlyNoCols() throws Exception {
+ String tablePath = buildTable(schemaOnlyDir, "sOnly", "noCols",
"sample.loge", "/regex/simple.log1");
+ try {
+ client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
+ String schemaSql = "create schema ()" +
+ " for table %s properties ('%s'='%s')";
+ run(schemaSql, tablePath, LogFormatPlugin.REGEX_PROP, DATE_ONLY_PATTERN);
+
+ String sql = "SELECT * FROM %s";
+ RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addSingleCol(strArray("2017", "12", "17"))
+ .addSingleCol(strArray("2017", "12", "18"))
+ .addSingleCol(strArray("2017", "12", "19"))
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ } finally {
+ client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
+ }
+ }
+
+ /**
+ * Test the case in which the plugin config contains no information
+ * other than the file extensions. The provided schema includes both
+ * the regex and the set of columns and types.
+ */
+
+ @Test
+ public void testSchemaOnlyWithCols() throws Exception {
+ String tablePath = buildTable(schemaOnlyDir, "sOnly", "withCols",
"sample.loge", "/regex/simple.log1");
+ try {
+ client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
+ String schemaSql = "create schema (`year` int not null, `month` int not
null, " +
+ "`day` int not null) " +
+ " for table " + tablePath +
+ " properties ('" + LogFormatPlugin.REGEX_PROP +
+ "'='" + DATE_ONLY_PATTERN + "')";
+ run(schemaSql);
+
+ String sql = "SELECT * FROM %s";
+ RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("year", MinorType.INT)
+ .add("month", MinorType.INT)
+ .add("day", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(2017, 12, 17)
+ .addRow(2017, 12, 18)
+ .addRow(2017, 12, 19)
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ } finally {
+ client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
+ }
+ }
+
+ /**
+ * Corner case: provided schema has the regex and two of the three
+ * columns, the third takes a default name and type.
+ */
+
+ @Test
+ public void testSchemaOnlyWithMissingCols() throws Exception {
+ String tablePath = buildTable(schemaOnlyDir, "sOnly", "missingCols",
"sample.loge", "/regex/simple.log1");
+ try {
+ client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
+ String schemaSql = "create schema (`year` int not null, `month` int not
null) " +
+ " for table " + tablePath +
+ " properties ('" + LogFormatPlugin.REGEX_PROP +
+ "'='" + DATE_ONLY_PATTERN + "')";
+ run(schemaSql);
+
+ String sql = "SELECT * FROM %s";
+ RowSet results = client.queryBuilder().sql(sql, tablePath).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("year", MinorType.INT)
+ .add("month", MinorType.INT)
+ .addNullable("field_2", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(2017, 12, "17")
+ .addRow(2017, 12, "18")
+ .addRow(2017, 12, "19")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ } finally {
+ client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
+ }
+ }
+
+ /**
+ * Verify that an error is thrown if no pattern is provided in
+ * the plugin config, table function or provided schema.
+ */
+
+ @Test
+ public void testEmptyPattern() throws Exception {
+ String tablePath = buildTable(tableFuncDir, "tf", "emptyRegex",
+ "sample.logf", "/regex/simple.log1");
+ try {
+ String sql = "SELECT * FROM %s";
+ client.queryBuilder().sql(sql, tablePath).run();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Regex property is required"));
+ }
+ }
+
+ /**
+ * Test the ability to use table functions to specify the regex.
+ */
+
+ @Test
+ public void testTableFunction() throws Exception {
+ String tablePath = buildTable(tableFuncDir, "tf", "table1",
+ "sample.logf", "/regex/simple.log1");
+
+ // Run a query using a table function.
+
+ String escaped = DATE_ONLY_PATTERN.replace("\\", "\\\\");
+ String sql = "SELECT * FROM table(%s(type => '%s', regex => '%s',
maxErrors => 10))";
+ RowSet results = client.queryBuilder().sql(sql, tablePath,
+ LogFormatPlugin.PLUGIN_NAME, escaped).rowSet();
+
+ // Verify that the returned data used the schema.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addArray("columns", MinorType.VARCHAR)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addSingleCol(strArray("2017", "12", "17"))
+ .addSingleCol(strArray("2017", "12", "18"))
+ .addSingleCol(strArray("2017", "12", "19"))
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+
+ /**
+ * Test the use of a table function to provide the regex. Verify
+ * that the plugin throws an error if no groups are defined.
+ */
+
+ @Test
+ public void testTableFunctionNoGroups() throws Exception {
+ String tablePath = buildTable(tableFuncDir, "tf", "noGroups",
+ "sample.logf", "/regex/simple.log1");
+
+ // Use a table function to pass in a regex without a group.
+
+ try {
+ String sql = "SELECT * FROM table(%s(type => '%s', regex => '''foo'''))";
+ client.queryBuilder().sql(sql, tablePath,
LogFormatPlugin.PLUGIN_NAME).run();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Regex property has no groups"));
+ }
+ }
+
+ /**
+ * Test the use of the schema table function to provide a schema
+ * including types. In this form, the regex must be provided by the
+ * plugin config or (as in this test), as table properties.
+ */
+
+ @Test
+ public void testTableFunctionWithSchema() throws Exception {
+ String tablePath = buildTable(tableFuncDir, "tf", "table2",
+ "sample.logf", "/regex/simple.log1");
+ try {
+ client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
+
+ // Run a query using a table function.
+
+ String escaped = DATE_ONLY_PATTERN.replace("\\", "\\\\");
+ String sql = "SELECT * FROM table(%s(" +
+ "schema=>'inline=(`year` int, `month` int, `day` int) properties
{`%s`=`%s`}'))";
+ RowSet results = client.queryBuilder().sql(sql, tablePath,
+ LogFormatPlugin.REGEX_PROP, escaped).rowSet();
+
+ // Verify that the returned data used the schema.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("year", MinorType.INT)
+ .addNullable("month", MinorType.INT)
+ .addNullable("day", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(2017, 12, 17)
+ .addRow(2017, 12, 18)
+ .addRow(2017, 12, 19)
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ } finally {
+ client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
+ }
+ }
+
+ /**
+ * Test that a schema can be provided in a table function that also includes
+ * plugin config. This case fails for the log format plugin because,
unfortunately,
+ * the log format plugin has a config field called "schema" which is not a
string
+ * and is found by the code before trying to treat "schema" as a schema.
+ * So, this test is disabled.
+ */
+
+ @Test
+ @Ignore("Use of schema conflicts with plugin field")
+ public void testTableFunctionWithConfigAndSchema() throws Exception {
+ String tablePath = buildTable(tableFuncDir, "tf", "table2",
+ "sample.logf", "/regex/simple.log1");
+ try {
+ client.alterSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE, true);
+
+ // Run a query using a table function.
+
+ String escaped = DATE_ONLY_PATTERN.replace("\\", "\\\\");
+ String sql = "SELECT * FROM table(%s(type => '%s', regex => '%s', " +
+ "schema=>'inline=(`year` int, `month` int, `day` int)'))";
+ RowSet results = client.queryBuilder().sql(sql, tablePath,
+ LogFormatPlugin.PLUGIN_NAME, escaped).rowSet();
+
+ // Verify that the returned data used the schema.
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("year", MinorType.INT)
+ .addNullable("month", MinorType.INT)
+ .addNullable("day", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = client.rowSetBuilder(expectedSchema)
+ .addRow(2017, 12, 17)
+ .addRow(2017, 12, 18)
+ .addRow(2017, 12, 19)
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ } finally {
+ client.resetSession(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE);
+ }
+ }
+
+}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
index c77574a..b86b548 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/AbstractPropertied.java
@@ -95,4 +95,20 @@ public class AbstractPropertied implements Propertied {
setProperty(key, null);
}
}
+
+ @Override
+ public int intProperty(String key) {
+ return intProperty(key, 0);
+ }
+
+ @Override
+ public int intProperty(String key, int defaultValue) {
+ String value = property(key);
+ return value == null ? defaultValue : Integer.parseInt(value);
+ }
+
+ @Override
+ public void setIntProperty(String key, int value) {
+ setProperty(key, Integer.toString(value));
+ }
}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
index c13adb3..1597ab1 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/Propertied.java
@@ -47,5 +47,26 @@ public interface Propertied {
boolean booleanProperty(String key);
boolean booleanProperty(String key, boolean defaultValue);
void setBooleanProperty(String key, boolean value);
+ int intProperty(String key);
+ int intProperty(String key, int defaultValue);
+ void setIntProperty(String key, int value);
+
+ /**
+ * Drill-wide properties are of the form:<br><tt>
+ * drill.<prop name></tt><br>
+ * While plugin-specific properties are of the form:<tt><br>
+ * drill.<plugin name>.<config prop name></br>
+ * Where "config prop name" is the field name in the plugin config.
+ * <p>
+ * This function generates the "drill.<plugin name>." prefix.
+ *
+ * @param pluginName name used in the "type" field of the plugin
+ * config
+ * @return the "drill.<plugin name>." prefix
+ */
+
+ public static String pluginPrefix(String pluginName) {
+ return DRILL_PROP_PREFIX + pluginName + ".";
+ }
}