vdiravka commented on a change in pull request #2089:
URL: https://github.com/apache/drill/pull/2089#discussion_r567140726



##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import java.util.Arrays;
+
+public class SplunkUtils {
+  /**
+   * These are special fields that alter the queries sent to Splunk.
+   */
+  public enum SPECIAL_FIELDS {
+    /**
+     * The sourcetype of a query. Specifying the sourcetype can improve query 
performance.
+     */
+    sourcetype,

Review comment:
       > Because they are constants, the names of an enum type's fields are in 
uppercase letters.
   
   https://docs.oracle.com/javase/tutorial/java/javaOO/enum.html

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkQueryBuilder.java
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.store.base.filter.RelOp;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+
+import java.util.List;
+import java.util.Map;
+
+public class SplunkQueryBuilder {
+  public final static String EQUAL_OPERATOR = "=";
+  public final static String NOT_EQUAL_OPERATOR = "!=";
+  public final static String GREATER_THAN = ">";
+  public final static String GREATER_THAN_EQ = ">=";
+  public final static String LESS_THAN = "<";
+  public final static String LESS_THAN_EQ = "<=";
+
+  private String query;
+  private String sourceTypes;
+  private String fieldList;
+  private String filters;
+  private int sourcetypeCount;
+  private int limit;
+
+  public SplunkQueryBuilder (String index) {
+    this.filters = "";
+    sourcetypeCount = 0;
+    query = "search index=" + index;
+  }
+
+  public void addSourceType(String sourceType) {

Review comment:
       Could you add the `javadoc` to `SplunkUtils.SPECIAL_FIELDS.SOURCETYPE` 
to understand what this method means? Possibly some example

##########
File path: pom.xml
##########
@@ -216,6 +216,11 @@
       <id>jitpack.io</id>
       <url>https://jitpack.io</url>
     </repository>
+      <repository>
+        <id>splunk-artifactory</id>
+        <name>Splunk Releases</name>
+        <url>https://repo.spring.io/plugins-release/</url>

Review comment:
       Starting from 21 of January this jar can't be download from 
`https://repo.spring.io/plugins-release` repo
   
https://spring.io/blog/2020/10/29/notice-of-permissions-changes-to-repo-spring-io-fall-and-winter-2020#january-21-2021-was-jan-6
   ```
   [INFO] Drill : Contrib : Format : MaprDB .................. SKIPPED
   [INFO] 
------------------------------------------------------------------------
   [INFO] BUILD FAILURE
   [INFO] 
------------------------------------------------------------------------
   [INFO] Total time:  47:02 min
   [INFO] Finished at: 2021-02-03T00:00:26Z
   [INFO] 
------------------------------------------------------------------------
   Error:  Failed to execute goal on project drill-storage-splunk: Could not 
resolve dependencies for project 
org.apache.drill.contrib:drill-storage-splunk:jar:1.19.0-SNAPSHOT: Failed to 
collect dependencies at com.splunk:splunk:jar:1.6.5.0: Failed to read artifact 
descriptor for com.splunk:splunk:jar:1.6.5.0: Could not transfer artifact 
com.splunk:splunk:pom:1.6.5.0 from/to splunk-artifactory 
(https://repo.spring.io/plugins-release/): Authentication failed for 
https://repo.spring.io/plugins-release/com/splunk/splunk/1.6.5.0/splunk-1.6.5.0.pom
 401 Unauthorized -> [Help 1]
   ```
   
   I'll change it to the following Splunk SDK:
   https://github.com/splunk/splunk-sdk-java/tree/1.6.5#using-maven

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class SplunkGroupScan extends AbstractGroupScan {
+
+  private final SplunkPluginConfig config;
+  private final List<SchemaPath> columns;
+  private final SplunkScanSpec splunkScanSpec;
+  private final Map<String, ExprNode.ColRelOpConstNode> filters;
+  private final ScanStats scanStats;
+  private final double filterSelectivity;
+  private final int maxRecords;
+
+  private int hashCode;
+
+  /**
+   * Creates a new group scan from the storage plugin.
+   */
+  public SplunkGroupScan (SplunkScanSpec scanSpec) {
+    super("no-user");
+    this.splunkScanSpec = scanSpec;
+    this.config = scanSpec.getConfig();
+    this.columns = ALL_COLUMNS;
+    this.filters = null;
+    this.filterSelectivity = 0.0;
+    this.maxRecords = -1;
+    this.scanStats = computeScanStats();
+
+  }
+
+  /**
+   * Copies the group scan during many stages of Calcite operation.
+   */
+  public SplunkGroupScan(SplunkGroupScan that) {
+    super(that);
+    this.config = that.config;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.columns = that.columns;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
+
+    // Calcite makes many copies in the later stage of planning
+    // without changing anything. Retain the previous stats.
+    this.scanStats = that.scanStats;
+  }
+
+  /**
+   * Applies columns. Oddly called multiple times, even when
+   * the scan already has columns.
+   */
+  public SplunkGroupScan(SplunkGroupScan that, List<SchemaPath> columns) {
+    super(that);
+    this.columns = columns;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+
+    // Oddly called later in planning, after earlier assigning columns,
+    // to again assign columns. Retain filters, but compute new stats.
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.scanStats = computeScanStats();
+
+  }
+
+  /**
+   * Adds a filter to the scan.
+   */
+  public SplunkGroupScan(SplunkGroupScan that, Map<String, 
ExprNode.ColRelOpConstNode> filters,
+                       double filterSelectivity) {
+    super(that);
+    this.columns = that.columns;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+
+    // Applies a filter.
+    this.filters = filters;
+    this.filterSelectivity = filterSelectivity;
+    this.maxRecords = that.maxRecords;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Deserialize a group scan. Not called in normal operation. Probably used
+   * only if Drill executes a logical plan.
+   */
+  @JsonCreator
+  public SplunkGroupScan(
+    @JsonProperty("config") SplunkPluginConfig config,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("splunkScanSpec") SplunkScanSpec splunkScanSpec,
+    @JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
+    @JsonProperty("filterSelectivity") double selectivity,
+    @JsonProperty("maxRecords") int maxRecords
+  ) {
+    super("no-user");
+    this.config = config;
+    this.columns = columns;
+    this.splunkScanSpec = splunkScanSpec;
+    this.filters = filters;
+    this.filterSelectivity = selectivity;
+    this.maxRecords = maxRecords;
+    this.scanStats = computeScanStats();
+  }
+
+  /**
+   * Adds a limit to the group scan
+   * @param that Previous SplunkGroupScan
+   * @param maxRecords the limit pushdown
+   */
+  public SplunkGroupScan(SplunkGroupScan that, int maxRecords) {
+    super(that);
+    this.columns = that.columns;
+    // Apply the limit
+    this.maxRecords = maxRecords;
+    this.splunkScanSpec = that.splunkScanSpec;
+    this.config = that.config;
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.scanStats = computeScanStats();
+  }
+
+  @JsonProperty("config")
+  public SplunkPluginConfig config() { return config; }
+
+  @JsonProperty("columns")
+  public List<SchemaPath> columns() { return columns; }
+
+  @JsonProperty("splunkScanSpec")
+  public SplunkScanSpec splunkScanSpec() { return splunkScanSpec; }
+
+  @JsonProperty("filters")
+  public Map<String, ExprNode.ColRelOpConstNode> filters() { return filters; }
+
+  @JsonProperty("maxRecords")
+  public int maxRecords() { return maxRecords; }
+
+  @JsonProperty("filterSelectivity")
+  public double selectivity() { return filterSelectivity; }
+
+
+  @Override
+  public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> 
endpoints) { }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    return new SplunkSubScan(config, splunkScanSpec, columns, filters, 
maxRecords);
+  }
+
+  @Override
+  public int getMaxParallelizationWidth() {
+    return 1;

Review comment:
       Agree, it can be considered as a future improvement

##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.JobExportArgs;
+import com.splunk.Service;
+import com.univocity.parsers.common.processor.RowListProcessor;
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchReader.class);
+  private static final List<String> INT_COLS = new 
ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", 
"date_second", "date_year", "linecount"));
+  private static final List<String> TS_COLS = new 
ArrayList<>(Arrays.asList("_indextime", "_time"));
+  private static final String EARLIEST_TIME_COLUMN = "earliestTime";
+  private static final String LATEST_TIME_COLUMN = "latestTime";
+
+  private final SplunkPluginConfig config;
+  private final SplunkSubScan subScan;
+  private final List<SchemaPath> projectedColumns;
+  private final Service splunkService;
+  private final SplunkScanSpec subScanSpec;
+  private final CsvParserSettings csvSettings;
+  private JobExportArgs exportArgs;
+  private InputStream searchResults;
+  private CsvParser csvReader;
+  private String[] firstRow;
+  private CustomErrorContext errorContext;
+
+  private List<SplunkColumnWriter> columnWriters;
+  private SchemaBuilder builder;
+  private RowSetLoader rowWriter;
+  private Stopwatch timer;
+
+
+  public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
+    this.config = config;
+    this.subScan = subScan;
+    this.projectedColumns = subScan.getColumns();
+    this.subScanSpec = subScan.getScanSpec();
+    SplunkConnection connection = new SplunkConnection(config);
+    this.splunkService = connection.connect();
+
+    this.csvSettings = new CsvParserSettings();
+    csvSettings.setLineSeparatorDetectionEnabled(true);
+    RowListProcessor rowProcessor = new RowListProcessor();
+    csvSettings.setProcessor(rowProcessor);
+    csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    timer = Stopwatch.createUnstarted();
+    timer.start();
+
+    this.errorContext = negotiator.parentErrorContext();
+
+    String queryString = buildQueryString();
+
+    logger.debug("Query Sent to Splunk: {}", queryString);
+    // Execute the query
+    searchResults = splunkService.export(queryString, exportArgs);
+    logger.debug("Time to execute query: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    /*
+    Splunk produces poor output from the API.  Of the available choices, CSV 
was the easiest to deal with.  Unfortunately,
+    the data is not consistent, as some fields are quoted, some are not.
+    */
+    this.csvReader = new CsvParser(csvSettings);
+    logger.debug("Time to open CSV Parser: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    csvReader.beginParsing(searchResults, "utf-8");
+    logger.debug("Time to open input stream: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    // Build the Schema
+    builder = new SchemaBuilder();
+    TupleMetadata drillSchema = buildSchema();
+    negotiator.tableSchema(drillSchema, false);
+    ResultSetLoader resultLoader = negotiator.build();
+
+    // Create ScalarWriters
+    rowWriter = resultLoader.writer();
+    populateWriterArray();
+    logger.debug("Completed open function in {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    timer.stop();
+    if (searchResults != null) {
+      AutoCloseables.closeSilently(searchResults);
+      searchResults = null;
+    }
+  }
+
+  /**
+   * Splunk returns the data in CSV format with some fields escaped and some 
not.  Splunk does
+   * not have the concept of datatypes, or at least does not make the metadata 
available in the API, so
+   * the best solution is to provide a list of columns that are known to be a 
specific data type such as _time,
+   * indextime, the various date components etc and map those as the 
appropriate columns.  Then map everything else as a string.
+   */
+  private TupleMetadata buildSchema() {
+
+    this.firstRow = csvReader.parseNext();
+
+    // Case for empty dataset
+    if (firstRow == null) {
+      return builder.buildSchema();
+    }
+
+    // Parse the first row
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.INT);
+      } else if (TS_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.TIMESTAMP);
+      } else {
+        try {
+          builder.addNullable(value, MinorType.VARCHAR);
+        } catch (Exception e) {
+          logger.warn("Splunk attempted to add duplicate column {}", value);

Review comment:
       I mean possibly we want to catch some specific kind of Exception with an 
above message. And catch general Exception with other general message




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to