Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/1114#discussion_r167131709
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
---
@@ -0,0 +1,261 @@
+package org.apache.drill.exec.store.log;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class LogRecordReader extends AbstractRecordReader {
+
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LogRecordReader.class);
+ private static final int MAX_RECORDS_PER_BATCH = 8096;
+
+ private String inputPath;
+ private BufferedReader reader;
+ private DrillBuf buffer;
+ private VectorContainerWriter writer;
+ private LogFormatPlugin.LogFormatConfig config;
+ private int lineCount;
+ private Pattern r;
+
+ private List<String> fieldNames;
+ private List<String> dataTypes;
+ private boolean errorOnMismatch;
+ private String dateFormat;
+ private String timeFormat;
+ private java.text.DateFormat df;
+ private java.text.DateFormat tf;
+ private long time;
+
+ public LogRecordReader(FragmentContext fragmentContext, String
inputPath, DrillFileSystem fileSystem,
+ List<SchemaPath> columns,
LogFormatPlugin.LogFormatConfig config) throws OutOfMemoryException {
+ try {
+ Path hdfsPath = new Path(inputPath);
+ Configuration conf = new Configuration();
+ FSDataInputStream fsStream = fileSystem.open(hdfsPath);
+ CompressionCodecFactory factory = new CompressionCodecFactory(conf);
+ CompressionCodec codec = factory.getCodec(hdfsPath);
+ if (codec == null) {
+ reader = new BufferedReader(new
InputStreamReader(fsStream.getWrappedStream(), "UTF-8"));
+ } else {
+ CompressionInputStream comInputStream =
codec.createInputStream(fsStream.getWrappedStream());
+ reader = new BufferedReader(new InputStreamReader(comInputStream));
+ }
+ this.inputPath = inputPath;
+ this.lineCount = 0;
+ this.config = config;
+ this.buffer = fragmentContext.getManagedBuffer(4096);
+ setColumns(columns);
+
+ } catch (IOException e) {
+ logger.debug("Log Reader Plugin: " + e.getMessage());
+ }
+ }
+
+ public void setup(final OperatorContext context, final OutputMutator
output) throws ExecutionSetupException {
+ this.writer = new VectorContainerWriter(output);
+ String regex = config.getPattern();
+
+
+ fieldNames = config.getFieldNames();
+ dataTypes = config.getDataTypes();
+ dateFormat = config.getDateFormat();
+ timeFormat = config.getTimeFormat();
+ errorOnMismatch = config.getErrorOnMismatch();
+
+ /*
+ This section will check for;
+ 1. Empty regex
+ 2. Invalid Regex
+ 3. Empty date string if the date format is used
+ 4. No capturing groups in the regex
+ 5. Incorrect number of data types
+ 6. Invalid data types
+ */
+ if (regex.isEmpty()) {
+ throw UserException.parseError().message("Log parser requires a
valid, non-empty regex in the plugin configuration").build(logger);
+ } else {
+ //TODO Check for invalid regex
+ r = Pattern.compile(regex);
+ Matcher m = r.matcher("test");
+ if (m.groupCount() == 0) {
+ throw UserException.parseError().message("Invalid Regular
Expression: No Capturing Groups", 0).build(logger);
+ } else if (m.groupCount() != (fieldNames.size())) {
+ throw UserException.parseError().message("Invalid Regular
Expression: Field names do not match capturing groups. There are " +
m.groupCount() + " captured groups in the data and " + fieldNames.size() + "
specified in the configuration.", 0).build(logger);
+
+ } else if ((dataTypes == null) || m.groupCount() !=
dataTypes.size()) {
+ //If the number of data types is not correct, create a list of
varchar
+ dataTypes = new ArrayList<String>();
+ for (int i = -0; i < m.groupCount(); i++) {
+ dataTypes.add("VARCHAR");
+ }
+ }
+ }
+
+ //Check and set up date formats
+ if (dataTypes.contains("DATE") || dataTypes.contains("TIMESTAMP")) {
+ if (dateFormat != null && !dateFormat.isEmpty()) {
+ df = new java.text.SimpleDateFormat(dateFormat);
+ } else {
+ throw UserException.parseError().message("Invalid date format.
The date formatting string was empty. Please specify a valid date format
string in the configuration for this data source.", 0).build(logger);
+ }
+ }
+
+ if (dataTypes.contains("TIME")) {
+ if (timeFormat != null && !timeFormat.isEmpty()) {
+ tf = new java.text.SimpleDateFormat(timeFormat);
+ } else {
+ throw UserException.parseError().message("Invalid time format.
The time formatting string was empty. Please specify a valid time format
string in the configuration for this data source.", 0).build(logger);
+ }
+ }
+
+ }
+
+ public int next() {
+ this.writer.allocate();
+ this.writer.reset();
+
+ int recordCount = 0;
+
+ try {
+ BaseWriter.MapWriter map = this.writer.rootAsMap();
+ String line = null;
+
+ while (recordCount < MAX_RECORDS_PER_BATCH && (line =
this.reader.readLine()) != null) {
+ lineCount++;
+
+ // Skip empty lines
+ if (line.trim().length() == 0) {
+ continue;
+ }
+
+ this.writer.setPosition(recordCount);
+ map.start();
+
+ Matcher m = r.matcher(line);
+ if (m.find()) {
+ for (int i = 1; i <= m.groupCount(); i++) {
--- End diff --
Why a `1`-based loop with the funny `i - 1` indexing? Just do `i + 1` as
needed to fetch the group.
---