vdiravka commented on a change in pull request #2089:
URL: https://github.com/apache/drill/pull/2089#discussion_r563393254



##########
File path: contrib/storage-splunk/README.md
##########
@@ -0,0 +1,152 @@
+# Drill Connector for Splunk
+This plugin enables Drill to query Splunk. 
+
+## Configuration
+To connect Drill to Splunk, create a new storage plugin with the following 
configuration:
+
+To successfully connect, Splunk uses port `8089` for interfaces.  This port 
must be open for Drill to query Splunk. 
+
+```json
+{
+   "type":"splunk",
+   "username": "admin",
+   "password": "changeme",
+   "hostname": "localhost",
+   "port": 8089,
+   "earliestTime": "-14d",
+   "latestTime": "now",
+   "enabled": false
+}
+```
+
+## Understanding Splunk's Data Model
+Splunk's primary use case is analyzing event logs with a timestamp. As such, 
data is indexed by the timestamp, with the most recent data being indexed 
first.  By default, Splunk
+ will sort the data in reverse chronological order.  Large Splunk 
installations will put older data into buckets of hot, warm and cold storage 
with the "cold" storage on the
+  slowest and cheapest disks.
+  
+With this understood, it is **very** important to put time boundaries on your 
Splunk queries. The Drill plugin allows you to set default values in the 
configuration such that every
+ query you run will be bounded by these boundaries.  Alternatively, you can 
set the time boundaries at query time.  In either case, you will achieve the 
best performance when
+  you are asking Splunk for the smallest amount of data possible.
+  
+## Understanding Drill's Data Model with Splunk
+Drill treats Splunk indexes as tables. Splunk's access model does not restrict 
to the catalog, but does restrict access to the actual data. It is therefore 
possible that you can
+ see the names of indexes to which you do not have access.  You can view the 
list of available indexes with a `SHOW TABLES IN splunk` query.
+  
+```
+apache drill> SHOW TABLES IN splunk;
++--------------+----------------+
+| TABLE_SCHEMA |   TABLE_NAME   |
++--------------+----------------+
+| splunk       | summary        |
+| splunk       | splunklogger   |
+| splunk       | _thefishbucket |
+| splunk       | _audit         |
+| splunk       | _internal      |
+| splunk       | _introspection |
+| splunk       | main           |
+| splunk       | history        |
+| splunk       | _telemetry     |
++--------------+----------------+
+9 rows selected (0.304 seconds)
+```
+To query Splunk from Drill, use the following format: 
+```sql
+SELECT <fields>
+FROM splunk.<index>
+```
+  
+ ## Bounding Your Queries
+  When you learn to query Splunk via their interface, the first thing you 
learn is to bound your queries so that they are looking at the shortest time 
span possible. When using
+   Drill to query Splunk, it is advisable to do the same thing, and Drill 
offers two ways to accomplish this: via the configuration and at query time.
+   
+  ### Bounding your Queries at Query Time
+  The easiest way to bound your query is to do so at querytime via special 
filters in the `WHERE` clause. There are two special fields, `earliestTime` and 
`latestTime` which can
+   be set to bound the query. If they are not set, the query will be bounded 
to the defaults set in the configuration.
+   
+   You can use any of the time formats specified in the Splunk documentation 
here:   
+  
https://docs.splunk.com/Documentation/Splunk/8.0.3/SearchReference/SearchTimeModifiers
+  
+  So if you wanted to see your data for the last 15 minutes, you could execute 
the following query:
+
+```sql
+SELECT <fields>
+FROM splunk.<index>
+WHERE earliestTime='-15m' AND latestTime='now'
+```
+The variables set in a query override the defaults from the configuration. 
+  
+ ## Data Types
+  Splunk does not have sophisticated data types and unfortunately does not 
provide metadata from its query results.  With the exception of the fields 
below, Drill will interpret
+   all fields as `VARCHAR` and hence you will have to convert them to the 
appropriate data type at query time.
+  
+  #### Timestamp Fields
+  * `_indextime`
+  * `_time` 
+  
+  #### Numeric Fields
+  * `date_hour` 
+  * `date_mday`
+  * `date_minute`
+  * `date_second` 
+  * `date_year`
+  * `linecount`
+  
+ ### Nested Data
+ Splunk has two different types of nested data which roughly map to Drill's 
`LIST` and `MAP` data types. Unfortunately, there is no easy way to identify 
whether a field is a
+  nested field at querytime as Splunk does not provide any metadata and 
therefore all fields are treated as `VARCHAR`.
+  
+  However, Drill does have built in functions to easily convert Splunk 
multifields into Drill `LIST` and `MAP` data types. For a LIST, simply use the 
+  `SPLIT(<field>, ' ')` function to split the field into a `LIST`.
+  
+  `MAP` data types are rendered as JSON in Splunk. Fortunately JSON can easily 
be parsed into a Drill Map by using the `convert_fromJSON()` function.  The 
query below
+   demonstrates how to convert a JSON column into a Drill `MAP`.
+  
+```sql
+SELECT convert_fromJSON(_raw) 
+FROM splunk.spl
+WHERE spl = '| makeresults
+| eval _raw="{\"pc\":{\"label\":\"PC\",\"count\":24,\"peak24\":12},\"ps3\":
+{\"label\":\"PS3\",\"count\":51,\"peak24\":10},\"xbox\":
+{\"label\":\"XBOX360\",\"count\":40,\"peak24\":11},\"xone\":
+{\"label\":\"XBOXONE\",\"count\":105,\"peak24\":99},\"ps4\":
+{\"label\":\"PS4\",\"count\":200,\"peak24\":80}}"'
+```
+
+### Selecting Fields
+When you execute a query in Drill for Splunk, the fields you select are pushed 
down to Splunk. Therefore, it will always be more efficient to explicitly 
specify fields to push
+ down to Splunk rather than using `SELECT *` queries.
+ 
+ ### Special Fields
+ There are several fields which can be included in a Drill query 
+ 
+ * `spl`:  If you just want to send an SPL query to Splunk, this will do that. 
+ * `earliestTime`: Overrides the `earliestTime` setting in the configuration. 
+ * `latestTime`: Overrides the `latestTime` setting in the configuration. 
+  
+### Sorting Results
+Due to the nature of Splunk indexes, data will always be returned in reverse 
chronological order. Thus, sorting is not necessary if that is the desired 
order.
+
+## Sending Arbitrary SPL to Splunk
+There is a special table called `spl` which you can use to send arbitrary 
queries to Splunk. If you use this table, you must include a query in the `spl` 
filter as shown below:
+```sql
+SELECT *
+FROM splunk.spl
+WHERE spl='<your SPL query'
+```
+
+# Testing the Plugin
+This plugin includes a series of unit tests in the `src/test/` directory, 
however you will need an active Splunk installation to run them.  Since Splunk 
is not an open source
+ project, nor is available as a Docker container, simply follow the 
instructions below to test Splunk with Drill.

Review comment:
       Looks like it is possible and easier to setup Splunk via Docker 
container. The doc can be updated here

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.JobExportArgs;
+import com.splunk.Service;
+import com.univocity.parsers.common.processor.RowListProcessor;
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchReader.class);
+  private static final List<String> INT_COLS = new 
ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", 
"date_second", "date_year", "linecount"));
+  private static final List<String> TS_COLS = new 
ArrayList<>(Arrays.asList("_indextime", "_time"));
+  private static final String EARLIEST_TIME_COLUMN = "earliestTime";
+  private static final String LATEST_TIME_COLUMN = "latestTime";
+
+  private final SplunkPluginConfig config;
+  private final SplunkSubScan subScan;
+  private final List<SchemaPath> projectedColumns;
+  private final Service splunkService;
+  private final SplunkScanSpec subScanSpec;
+  private final CsvParserSettings csvSettings;
+  private JobExportArgs exportArgs;
+  private InputStream searchResults;
+  private CsvParser csvReader;
+  private String[] firstRow;
+  private CustomErrorContext errorContext;
+
+  private List<SplunkColumnWriter> columnWriters;
+  private SchemaBuilder builder;
+  private RowSetLoader rowWriter;
+  private Stopwatch timer;
+
+
+  public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
+    this.config = config;
+    this.subScan = subScan;
+    this.projectedColumns = subScan.getColumns();
+    this.subScanSpec = subScan.getScanSpec();
+    SplunkConnection connection = new SplunkConnection(config);
+    this.splunkService = connection.connect();
+
+    this.csvSettings = new CsvParserSettings();
+    csvSettings.setLineSeparatorDetectionEnabled(true);
+    RowListProcessor rowProcessor = new RowListProcessor();
+    csvSettings.setProcessor(rowProcessor);
+    csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    timer = Stopwatch.createUnstarted();
+    timer.start();
+
+    this.errorContext = negotiator.parentErrorContext();
+
+    String queryString = buildQueryString();
+
+    logger.debug("Query Sent to Splunk: {}", queryString);
+    // Execute the query
+    searchResults = splunkService.export(queryString, exportArgs);
+    logger.debug("Time to execute query: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    /*
+    Splunk produces poor output from the API.  Of the available choices, CSV 
was the easiest to deal with.  Unfortunately,
+    the data is not consistent, as some fields are quoted, some are not.
+    */
+    this.csvReader = new CsvParser(csvSettings);
+    logger.debug("Time to open CSV Parser: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    csvReader.beginParsing(searchResults, "utf-8");
+    logger.debug("Time to open input stream: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    // Build the Schema
+    builder = new SchemaBuilder();
+    TupleMetadata drillSchema = buildSchema();
+    negotiator.tableSchema(drillSchema, false);
+    ResultSetLoader resultLoader = negotiator.build();
+
+    // Create ScalarWriters
+    rowWriter = resultLoader.writer();
+    populateWriterArray();
+    logger.debug("Completed open function in {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    timer.stop();
+    if (searchResults != null) {
+      AutoCloseables.closeSilently(searchResults);
+      searchResults = null;
+    }
+  }
+
+  /**
+   * Splunk returns the data in CSV format with some fields escaped and some 
not.  Splunk does
+   * not have the concept of datatypes, or at least does not make the metadata 
available in the API, so
+   * the best solution is to provide a list of columns that are known to be a 
specific data type such as _time,
+   * indextime, the various date components etc and map those as the 
appropriate columns.  Then map everything else as a string.
+   */
+  private TupleMetadata buildSchema() {
+
+    this.firstRow = csvReader.parseNext();
+
+    // Case for empty dataset
+    if (firstRow == null) {
+      return builder.buildSchema();
+    }
+
+    // Parse the first row
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.INT);
+      } else if (TS_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.TIMESTAMP);
+      } else {
+        try {
+          builder.addNullable(value, MinorType.VARCHAR);
+        } catch (Exception e) {
+          logger.warn("Splunk attempted to add duplicate column {}", value);

Review comment:
       There are no other possible exceptions in `builder.addNullable(value, 
MinorType.VARCHAR);`?

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanBatchCreator.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import 
org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework.ScanFrameworkBuilder;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+
+public class SplunkScanBatchCreator implements BatchCreator<SplunkSubScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context,
+                                       SplunkSubScan subScan, 
List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+
+    try {
+      ScanFrameworkBuilder builder = createBuilder(context.getOptions(), 
subScan);
+      return builder.buildScanOperator(context, subScan);
+    } catch (UserException e) {
+      // Rethrow user exceptions directly
+      throw e;
+    } catch (Throwable e) {
+      // Wrap all others
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private ScanFrameworkBuilder createBuilder(OptionManager options, 
SplunkSubScan subScan) {
+    SplunkPluginConfig config = subScan.getConfig();
+    ScanFrameworkBuilder builder = new ScanFrameworkBuilder();
+    builder.projection(subScan.getColumns());
+    builder.setUserName(subScan.getUserName());
+
+    // Reader
+    ReaderFactory readerFactory = new SplunkReaderFactory(config, subScan);
+    builder.setReaderFactory(readerFactory);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
+  }
+
+  private static class SplunkReaderFactory implements ReaderFactory {
+
+    private final SplunkPluginConfig config;
+    private final SplunkSubScan subScan;
+    private int count;
+
+    public SplunkReaderFactory(SplunkPluginConfig config, SplunkSubScan 
subScan) {
+      this.config = config;
+      this.subScan = subScan;
+    }
+
+    @Override
+    public void bind(ManagedScanFramework framework) {
+    }
+
+    @Override
+    public ManagedReader<SchemaNegotiator> next() {
+      // Only a single scan (in a single thread)
+      if (count++ == 0) {
+        return new SplunkBatchReader(config, subScan);
+      }
+      return null;
+    }
+  }
+}

Review comment:
       ```suggestion
   }
   
   ```

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class SplunkGroupScan extends AbstractGroupScan {
+
+  private final SplunkPluginConfig config;
+  private final List<SchemaPath> columns;
+  private final SplunkScanSpec splunkScanSpec;
+  private final Map<String, ExprNode.ColRelOpConstNode> filters;
+  private final ScanStats scanStats;
+  private final double filterSelectivity;
+  private final int maxRecords;
+
+  private int hashCode;
+
+  /**
+   * Creates a new group scan from the storage plugin.
+   */
+  public SplunkGroupScan (SplunkScanSpec scanSpec) {
+    super("no-user");
+    this.splunkScanSpec = scanSpec;
+    this.config = scanSpec.getConfig();
+    this.columns = ALL_COLUMNS;
+    this.filters = null;
+    this.filterSelectivity = 0.0;
+    this.maxRecords = -1;
+    this.scanStats = computeScanStats();
+
+  }
+
+  /**
+   * Copies the group scan during many stages of Calcite operation.
+   */
+  public SplunkGroupScan(SplunkGroupScan that) {
+    super(that);
+    this.config = that.config;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.columns = that.columns;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
+
+    // Calcite makes many copies in the later stage of planning
+    // without changing anything. Retain the previous stats.
+    this.scanStats = that.scanStats;
+  }
+
+  /**
+   * Applies columns. Oddly called multiple times, even when
+   * the scan already has columns.
+   */
+  public SplunkGroupScan(SplunkGroupScan that, List<SchemaPath> columns) {
+    super(that);
+    this.columns = columns;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+
+    // Oddly called later in planning, after earlier assigning columns,
+    // to again assign columns. Retain filters, but compute new stats.
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.scanStats = computeScanStats();
+
+  }
+
+  /**
+   * Adds a filter to the scan.
+   */
+  public SplunkGroupScan(SplunkGroupScan that, Map<String, 
ExprNode.ColRelOpConstNode> filters,
+                       double filterSelectivity) {
+    super(that);
+    this.columns = that.columns;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+
+    // Applies a filter.
+    this.filters = filters;
+    this.filterSelectivity = filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Deserialize a group scan. Not called in normal operation. Probably used
+   * only if Drill executes a logical plan.
+   */
+  @JsonCreator
+  public SplunkGroupScan(
+    @JsonProperty("config") SplunkPluginConfig config,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("splunkScanSpec") SplunkScanSpec splunkScanSpec,
+    @JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
+    @JsonProperty("filterSelectivity") double selectivity,
+    @JsonProperty("maxRecords") int maxRecords
+  ) {
+    super("no-user");
+    this.config = config;
+    this.columns = columns;
+    this.splunkScanSpec = splunkScanSpec;
+    this.filters = filters;
+    this.filterSelectivity = selectivity;
+    this.maxRecords = maxRecords;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Adds a limit to the group scan
+   * @param that Previous SplunkGroupScan
+   * @param maxRecords the limit pushdown
+   */
+  public SplunkGroupScan(SplunkGroupScan that, int maxRecords) {
+    super(that);
+    this.columns = that.columns;
+    // Apply the limit
+    this.maxRecords = maxRecords;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.scanStats = computeScanStats();
+  }
+
+  @JsonProperty("config")
+  public SplunkPluginConfig config() { return config; }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> columns() { return columns; }
+
+  @JsonProperty("splunkScanSpec")
+  public SplunkScanSpec splunkScanSpec() { return splunkScanSpec; }
+
+  @JsonProperty("filters")
+  public Map<String, ExprNode.ColRelOpConstNode> filters() { return filters; }
+
+  @JsonProperty("maxRecords")
+  public int maxRecords() { return maxRecords; }
+
+  @JsonProperty("filterSelectivity")
+  public double selectivity() { return filterSelectivity; }
+
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> 
endpoints) { }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    return new SplunkSubScan(config, splunkScanSpec, columns, filters, 
maxRecords);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;

Review comment:
       No way for parallelization in future?

##########
File path: contrib/storage-splunk/README.md
##########
@@ -0,0 +1,152 @@
+# Drill Connector for Splunk
+This plugin enables Drill to query Splunk. 
+
+## Configuration
+To connect Drill to Splunk, create a new storage plugin with the following 
configuration:
+
+To successfully connect, Splunk uses port `8089` for interfaces.  This port 
must be open for Drill to query Splunk. 
+
+```json
+{
+   "type":"splunk",
+   "username": "admin",
+   "password": "changeme",
+   "hostname": "localhost",
+   "port": 8089,
+   "earliestTime": "-14d",
+   "latestTime": "now",
+   "enabled": false
+}
+```
+
+## Understanding Splunk's Data Model
+Splunk's primary use case is analyzing event logs with a timestamp. As such, 
data is indexed by the timestamp, with the most recent data being indexed 
first.  By default, Splunk
+ will sort the data in reverse chronological order.  Large Splunk 
installations will put older data into buckets of hot, warm and cold storage 
with the "cold" storage on the
+  slowest and cheapest disks.
+  
+With this understood, it is **very** important to put time boundaries on your 
Splunk queries. The Drill plugin allows you to set default values in the 
configuration such that every
+ query you run will be bounded by these boundaries.  Alternatively, you can 
set the time boundaries at query time.  In either case, you will achieve the 
best performance when
+  you are asking Splunk for the smallest amount of data possible.
+  
+## Understanding Drill's Data Model with Splunk
+Drill treats Splunk indexes as tables. Splunk's access model does not restrict 
to the catalog, but does restrict access to the actual data. It is therefore 
possible that you can
+ see the names of indexes to which you do not have access.  You can view the 
list of available indexes with a `SHOW TABLES IN splunk` query.
+  
+```
+apache drill> SHOW TABLES IN splunk;
++--------------+----------------+
+| TABLE_SCHEMA |   TABLE_NAME   |
++--------------+----------------+
+| splunk       | summary        |
+| splunk       | splunklogger   |
+| splunk       | _thefishbucket |
+| splunk       | _audit         |
+| splunk       | _internal      |
+| splunk       | _introspection |
+| splunk       | main           |
+| splunk       | history        |
+| splunk       | _telemetry     |
++--------------+----------------+
+9 rows selected (0.304 seconds)
+```
+To query Splunk from Drill, use the following format: 
+```sql
+SELECT <fields>
+FROM splunk.<index>
+```
+  
+ ## Bounding Your Queries
+  When you learn to query Splunk via their interface, the first thing you 
learn is to bound your queries so that they are looking at the shortest time 
span possible. When using
+   Drill to query Splunk, it is advisable to do the same thing, and Drill 
offers two ways to accomplish this: via the configuration and at query time.
+   
+  ### Bounding your Queries at Query Time
+  The easiest way to bound your query is to do so at querytime via special 
filters in the `WHERE` clause. There are two special fields, `earliestTime` and 
`latestTime` which can
+   be set to bound the query. If they are not set, the query will be bounded 
to the defaults set in the configuration.
+   
+   You can use any of the time formats specified in the Splunk documentation 
here:   
+  
https://docs.splunk.com/Documentation/Splunk/8.0.3/SearchReference/SearchTimeModifiers
+  
+  So if you wanted to see your data for the last 15 minutes, you could execute 
the following query:
+
+```sql
+SELECT <fields>
+FROM splunk.<index>
+WHERE earliestTime='-15m' AND latestTime='now'
+```
+The variables set in a query override the defaults from the configuration. 
+  
+ ## Data Types
+  Splunk does not have sophisticated data types and unfortunately does not 
provide metadata from its query results.  With the exception of the fields 
below, Drill will interpret
+   all fields as `VARCHAR` and hence you will have to convert them to the 
appropriate data type at query time.
+  
+  #### Timestamp Fields
+  * `_indextime`
+  * `_time` 
+  
+  #### Numeric Fields
+  * `date_hour` 
+  * `date_mday`
+  * `date_minute`
+  * `date_second` 
+  * `date_year`
+  * `linecount`
+  
+ ### Nested Data
+ Splunk has two different types of nested data which roughly map to Drill's 
`LIST` and `MAP` data types. Unfortunately, there is no easy way to identify 
whether a field is a
+  nested field at querytime as Splunk does not provide any metadata and 
therefore all fields are treated as `VARCHAR`.

Review comment:
       What about Schema Provision and Metastore? Is that possible to set the 
schema for Splunk?

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.JobExportArgs;
+import com.splunk.Service;
+import com.univocity.parsers.common.processor.RowListProcessor;
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchReader.class);
+  private static final List<String> INT_COLS = new 
ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", 
"date_second", "date_year", "linecount"));
+  private static final List<String> TS_COLS = new 
ArrayList<>(Arrays.asList("_indextime", "_time"));
+  private static final String EARLIEST_TIME_COLUMN = "earliestTime";
+  private static final String LATEST_TIME_COLUMN = "latestTime";
+
+  private final SplunkPluginConfig config;
+  private final SplunkSubScan subScan;
+  private final List<SchemaPath> projectedColumns;
+  private final Service splunkService;
+  private final SplunkScanSpec subScanSpec;
+  private final CsvParserSettings csvSettings;
+  private JobExportArgs exportArgs;
+  private InputStream searchResults;
+  private CsvParser csvReader;
+  private String[] firstRow;
+  private CustomErrorContext errorContext;
+
+  private List<SplunkColumnWriter> columnWriters;
+  private SchemaBuilder builder;
+  private RowSetLoader rowWriter;
+  private Stopwatch timer;
+
+
+  public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
+    this.config = config;
+    this.subScan = subScan;
+    this.projectedColumns = subScan.getColumns();
+    this.subScanSpec = subScan.getScanSpec();
+    SplunkConnection connection = new SplunkConnection(config);
+    this.splunkService = connection.connect();
+
+    this.csvSettings = new CsvParserSettings();
+    csvSettings.setLineSeparatorDetectionEnabled(true);
+    RowListProcessor rowProcessor = new RowListProcessor();
+    csvSettings.setProcessor(rowProcessor);
+    csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    timer = Stopwatch.createUnstarted();
+    timer.start();
+
+    this.errorContext = negotiator.parentErrorContext();
+
+    String queryString = buildQueryString();
+
+    logger.debug("Query Sent to Splunk: {}", queryString);
+    // Execute the query
+    searchResults = splunkService.export(queryString, exportArgs);
+    logger.debug("Time to execute query: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    /*
+    Splunk produces poor output from the API.  Of the available choices, CSV 
was the easiest to deal with.  Unfortunately,
+    the data is not consistent, as some fields are quoted, some are not.
+    */
+    this.csvReader = new CsvParser(csvSettings);
+    logger.debug("Time to open CSV Parser: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    csvReader.beginParsing(searchResults, "utf-8");
+    logger.debug("Time to open input stream: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    // Build the Schema
+    builder = new SchemaBuilder();
+    TupleMetadata drillSchema = buildSchema();
+    negotiator.tableSchema(drillSchema, false);
+    ResultSetLoader resultLoader = negotiator.build();
+
+    // Create ScalarWriters
+    rowWriter = resultLoader.writer();
+    populateWriterArray();
+    logger.debug("Completed open function in {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    timer.stop();
+    if (searchResults != null) {
+      AutoCloseables.closeSilently(searchResults);
+      searchResults = null;
+    }
+  }
+
+  /**
+   * Splunk returns the data in CSV format with some fields escaped and some 
not.  Splunk does
+   * not have the concept of datatypes, or at least does not make the metadata 
available in the API, so
+   * the best solution is to provide a list of columns that are known to be a 
specific data type such as _time,
+   * indextime, the various date components etc and map those as the 
appropriate columns.  Then map everything else as a string.
+   */
+  private TupleMetadata buildSchema() {
+
+    this.firstRow = csvReader.parseNext();
+
+    // Case for empty dataset
+    if (firstRow == null) {
+      return builder.buildSchema();
+    }
+
+    // Parse the first row
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.INT);
+      } else if (TS_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.TIMESTAMP);
+      } else {
+        try {
+          builder.addNullable(value, MinorType.VARCHAR);
+        } catch (Exception e) {
+          logger.warn("Splunk attempted to add duplicate column {}", value);
+        }
+      }
+    }
+    logger.debug("Time to build schmea: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    return builder.buildSchema();
+  }
+
+  private void populateWriterArray() {
+    // Case for empty result set
+    if (firstRow == null || firstRow.length == 0) {
+      return;
+    }
+    columnWriters = new ArrayList<>();
+
+    int colPosition = 0;
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        columnWriters.add(new IntColumnWriter(value, rowWriter, colPosition));
+      } else if (TS_COLS.contains(value)) {
+        columnWriters.add(new TimestampColumnWriter(value, rowWriter, 
colPosition));
+      } else {
+        columnWriters.add(new StringColumnWriter(value, rowWriter, 
colPosition));
+      }
+      colPosition++;
+    }
+    logger.debug("Time to populate writer array: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+  }
+
+  private boolean processRow() {
+    String[] nextRow = csvReader.parseNext();
+    if (nextRow == null) {
+      return false;
+    }
+    rowWriter.start();
+    for (SplunkColumnWriter columnWriter : columnWriters) {
+      columnWriter.load(nextRow);
+    }
+    rowWriter.save();
+    return true;
+  }
+
+  /**
+   * Checks to see whether the query is a star query. For our purposes, the 
star query is
+   * anything that contains only the ** and the SPECIAL_COLUMNS which are not 
projected.
+   * @return true if it is a star query, false if not.
+   */
+  private boolean isStarQuery() {
+    List<SplunkUtils.SPECIAL_FIELDS> specialFields = 
Arrays.asList(SplunkUtils.SPECIAL_FIELDS.values());
+
+    for (SchemaPath path: projectedColumns) {

Review comment:
       Is `Utilities#isStarQuery` applied for the first part of this method?

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.EntityCollection;
+import com.splunk.Index;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class SplunkSchemaFactory extends AbstractSchemaFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkSchemaFactory.class);
+  private static final String SPL_TABLE_NAME = "spl";
+  private final SplunkStoragePlugin plugin;
+  private final EntityCollection<Index> indexes;
+
+  public SplunkSchemaFactory(SplunkStoragePlugin plugin) {
+    super(plugin.getName());
+    this.plugin = plugin;
+    SplunkPluginConfig config = plugin.getConfig();
+    SplunkConnection connection = new SplunkConnection(config);
+
+    // Get Splunk Indexes
+    connection.connect();
+    indexes = connection.getIndexes();
+  }
+
+  @Override
+

Review comment:
       ```suggestion
   ```

##########
File path: contrib/storage-splunk/src/test/resources/logback-test.xml.bak
##########
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<configuration>
+  <if condition='property("drill.lilith.enable").equalsIgnoreCase("true")'>
+    <then>
+      <appender name="SOCKET" 
class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
+        <Compressing>true</Compressing>
+        <ReconnectionDelay>10000</ReconnectionDelay>
+        <IncludeCallerData>true</IncludeCallerData>
+        <RemoteHosts>${LILITH_HOSTNAME:-localhost}</RemoteHosts>
+      </appender>
+
+      <logger name="org.apache.drill" additivity="false">
+        <level value="DEBUG"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+
+      <logger name="query.logger" additivity="false">
+        <level value="ERROR"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+      <logger name="org.apache.drill.exec.store.splunk">
+        <level value="DEBUG"/>
+        <appender-ref ref="SOCKET"/>
+      </logger>
+    </then>
+  </if>
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <!-- encoders are assigned the type
+         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - 
%msg%n</pattern>
+    </encoder>
+  </appender>
+  <logger name="org.apache.drill.exec.store.splunk" additivity="false">
+    <level value="DEBUG" />
+    <appender-ref ref="STDOUT" />
+  </logger>
+</configuration>

Review comment:
       ```suggestion
   </configuration>
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to