vdiravka commented on a change in pull request #2089:
URL: https://github.com/apache/drill/pull/2089#discussion_r572429790
##########
File path: contrib/storage-splunk/README.md
##########
@@ -0,0 +1,152 @@
+# Drill Connector for Splunk
+This plugin enables Drill to query Splunk.
+
+## Configuration
+To connect Drill to Splunk, create a new storage plugin with the following
configuration:
+
+To successfully connect, Splunk uses port `8089` for interfaces. This port
must be open for Drill to query Splunk.
+
+```json
+{
+ "type":"splunk",
+ "username": "admin",
+ "password": "changeme",
+ "hostname": "localhost",
+ "port": 8089,
+ "earliestTime": "-14d",
+ "latestTime": "now",
+ "enabled": false
+}
+```
+
+## Understanding Splunk's Data Model
+Splunk's primary use case is analyzing event logs with a timestamp. As such,
data is indexed by the timestamp, with the most recent data being indexed
first. By default, Splunk
+ will sort the data in reverse chronological order. Large Splunk
installations will put older data into buckets of hot, warm and cold storage
with the "cold" storage on the
+ slowest and cheapest disks.
+
+With this understood, it is **very** important to put time boundaries on your
Splunk queries. The Drill plugin allows you to set default values in the
configuration such that every
+ query you run will be bounded by these boundaries. Alternatively, you can
set the time boundaries at query time. In either case, you will achieve the
best performance when
+ you are asking Splunk for the smallest amount of data possible.
+
+## Understanding Drill's Data Model with Splunk
+Drill treats Splunk indexes as tables. Splunk's access model does not restrict
to the catalog, but does restrict access to the actual data. It is therefore
possible that you can
+ see the names of indexes to which you do not have access. You can view the
list of available indexes with a `SHOW TABLES IN splunk` query.
+
+```
+apache drill> SHOW TABLES IN splunk;
++--------------+----------------+
+| TABLE_SCHEMA | TABLE_NAME |
++--------------+----------------+
+| splunk | summary |
+| splunk | splunklogger |
+| splunk | _thefishbucket |
+| splunk | _audit |
+| splunk | _internal |
+| splunk | _introspection |
+| splunk | main |
+| splunk | history |
+| splunk | _telemetry |
++--------------+----------------+
+9 rows selected (0.304 seconds)
+```
+To query Splunk from Drill, use the following format:
+```sql
+SELECT <fields>
+FROM splunk.<index>
+```
+
+ ## Bounding Your Queries
+ When you learn to query Splunk via their interface, the first thing you
learn is to bound your queries so that they are looking at the shortest time
span possible. When using
+ Drill to query Splunk, it is advisable to do the same thing, and Drill
offers two ways to accomplish this: via the configuration and at query time.
+
+ ### Bounding your Queries at Query Time
+ The easiest way to bound your query is to do so at querytime via special
filters in the `WHERE` clause. There are two special fields, `earliestTime` and
`latestTime` which can
+ be set to bound the query. If they are not set, the query will be bounded
to the defaults set in the configuration.
+
+ You can use any of the time formats specified in the Splunk documentation
here:
+
https://docs.splunk.com/Documentation/Splunk/8.0.3/SearchReference/SearchTimeModifiers
+
+ So if you wanted to see your data for the last 15 minutes, you could execute
the following query:
+
+```sql
+SELECT <fields>
+FROM splunk.<index>
+WHERE earliestTime='-15m' AND latestTime='now'
+```
+The variables set in a query override the defaults from the configuration.
+
+ ## Data Types
+ Splunk does not have sophisticated data types and unfortunately does not
provide metadata from its query results. With the exception of the fields
below, Drill will interpret
+ all fields as `VARCHAR` and hence you will have to convert them to the
appropriate data type at query time.
+
+ #### Timestamp Fields
+ * `_indextime`
+ * `_time`
+
+ #### Numeric Fields
+ * `date_hour`
+ * `date_mday`
+ * `date_minute`
+ * `date_second`
+ * `date_year`
+ * `linecount`
+
+ ### Nested Data
+ Splunk has two different types of nested data which roughly map to Drill's
`LIST` and `MAP` data types. Unfortunately, there is no easy way to identify
whether a field is a
+ nested field at querytime as Splunk does not provide any metadata and
therefore all fields are treated as `VARCHAR`.
Review comment:
I'll check [`Drill Schema
Provision`](https://drill.apache.org/docs/using-drill-metastore/#using-schema-provisioning-feature-with-drill-metastore)
with Splunk storage. If any changes needed that can be done as future
improvement.
##########
File path:
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.JobExportArgs;
+import com.splunk.Service;
+import com.univocity.parsers.common.processor.RowListProcessor;
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SplunkBatchReader.class);
+ private static final List<String> INT_COLS = new
ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute",
"date_second", "date_year", "linecount"));
+ private static final List<String> TS_COLS = new
ArrayList<>(Arrays.asList("_indextime", "_time"));
+ private static final String EARLIEST_TIME_COLUMN = "earliestTime";
+ private static final String LATEST_TIME_COLUMN = "latestTime";
+
+ private final SplunkPluginConfig config;
+ private final SplunkSubScan subScan;
+ private final List<SchemaPath> projectedColumns;
+ private final Service splunkService;
+ private final SplunkScanSpec subScanSpec;
+ private final CsvParserSettings csvSettings;
+ private JobExportArgs exportArgs;
+ private InputStream searchResults;
+ private CsvParser csvReader;
+ private String[] firstRow;
+ private CustomErrorContext errorContext;
+
+ private List<SplunkColumnWriter> columnWriters;
+ private SchemaBuilder builder;
+ private RowSetLoader rowWriter;
+ private Stopwatch timer;
+
+
+ public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
+ this.config = config;
+ this.subScan = subScan;
+ this.projectedColumns = subScan.getColumns();
+ this.subScanSpec = subScan.getScanSpec();
+ SplunkConnection connection = new SplunkConnection(config);
+ this.splunkService = connection.connect();
+
+ this.csvSettings = new CsvParserSettings();
+ csvSettings.setLineSeparatorDetectionEnabled(true);
+ RowListProcessor rowProcessor = new RowListProcessor();
+ csvSettings.setProcessor(rowProcessor);
+ csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
+ }
+
+ @Override
+ public boolean open(SchemaNegotiator negotiator) {
+ timer = Stopwatch.createUnstarted();
+ timer.start();
+
+ this.errorContext = negotiator.parentErrorContext();
+
+ String queryString = buildQueryString();
+
+ logger.debug("Query Sent to Splunk: {}", queryString);
+ // Execute the query
+ searchResults = splunkService.export(queryString, exportArgs);
+ logger.debug("Time to execute query: {} milliseconds",
timer.elapsed().getNano() / 100000);
+
+ /*
+ Splunk produces poor output from the API. Of the available choices, CSV
was the easiest to deal with. Unfortunately,
+ the data is not consistent, as some fields are quoted, some are not.
+ */
+ this.csvReader = new CsvParser(csvSettings);
+ logger.debug("Time to open CSV Parser: {} milliseconds",
timer.elapsed().getNano() / 100000);
+ csvReader.beginParsing(searchResults, "utf-8");
+ logger.debug("Time to open input stream: {} milliseconds",
timer.elapsed().getNano() / 100000);
+
+ // Build the Schema
+ builder = new SchemaBuilder();
+ TupleMetadata drillSchema = buildSchema();
+ negotiator.tableSchema(drillSchema, false);
+ ResultSetLoader resultLoader = negotiator.build();
+
+ // Create ScalarWriters
+ rowWriter = resultLoader.writer();
+ populateWriterArray();
+ logger.debug("Completed open function in {} milliseconds",
timer.elapsed().getNano() / 100000);
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ while (!rowWriter.isFull()) {
+ if (!processRow()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ timer.stop();
+ if (searchResults != null) {
+ AutoCloseables.closeSilently(searchResults);
+ searchResults = null;
+ }
+ }
+
+ /**
+ * Splunk returns the data in CSV format with some fields escaped and some
not. Splunk does
+ * not have the concept of datatypes, or at least does not make the metadata
available in the API, so
+ * the best solution is to provide a list of columns that are known to be a
specific data type such as _time,
+ * indextime, the various date components etc and map those as the
appropriate columns. Then map everything else as a string.
+ */
+ private TupleMetadata buildSchema() {
+
+ this.firstRow = csvReader.parseNext();
+
+ // Case for empty dataset
+ if (firstRow == null) {
+ return builder.buildSchema();
+ }
+
+ // Parse the first row
+ for (String value : firstRow) {
+ if (INT_COLS.contains(value)) {
+ builder.addNullable(value, MinorType.INT);
+ } else if (TS_COLS.contains(value)) {
+ builder.addNullable(value, MinorType.TIMESTAMP);
+ } else {
+ try {
+ builder.addNullable(value, MinorType.VARCHAR);
+ } catch (Exception e) {
+ logger.warn("Splunk attempted to add duplicate column {}", value);
Review comment:
Possibly warning is fine for now
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]