cgivre commented on a change in pull request #2089:
URL: https://github.com/apache/drill/pull/2089#discussion_r571650540



##########
File path: 
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
##########
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import com.splunk.JobExportArgs;
+import com.splunk.Service;
+import com.univocity.parsers.common.processor.RowListProcessor;
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.base.filter.ExprNode;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchReader.class);
+  private static final List<String> INT_COLS = new 
ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", 
"date_second", "date_year", "linecount"));
+  private static final List<String> TS_COLS = new 
ArrayList<>(Arrays.asList("_indextime", "_time"));
+  private static final String EARLIEST_TIME_COLUMN = "earliestTime";
+  private static final String LATEST_TIME_COLUMN = "latestTime";
+
+  private final SplunkPluginConfig config;
+  private final SplunkSubScan subScan;
+  private final List<SchemaPath> projectedColumns;
+  private final Service splunkService;
+  private final SplunkScanSpec subScanSpec;
+  private final CsvParserSettings csvSettings;
+  private JobExportArgs exportArgs;
+  private InputStream searchResults;
+  private CsvParser csvReader;
+  private String[] firstRow;
+  private CustomErrorContext errorContext;
+
+  private List<SplunkColumnWriter> columnWriters;
+  private SchemaBuilder builder;
+  private RowSetLoader rowWriter;
+  private Stopwatch timer;
+
+
+  public SplunkBatchReader(SplunkPluginConfig config, SplunkSubScan subScan) {
+    this.config = config;
+    this.subScan = subScan;
+    this.projectedColumns = subScan.getColumns();
+    this.subScanSpec = subScan.getScanSpec();
+    SplunkConnection connection = new SplunkConnection(config);
+    this.splunkService = connection.connect();
+
+    this.csvSettings = new CsvParserSettings();
+    csvSettings.setLineSeparatorDetectionEnabled(true);
+    RowListProcessor rowProcessor = new RowListProcessor();
+    csvSettings.setProcessor(rowProcessor);
+    csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
+  }
+
+  @Override
+  public boolean open(SchemaNegotiator negotiator) {
+    timer = Stopwatch.createUnstarted();
+    timer.start();
+
+    this.errorContext = negotiator.parentErrorContext();
+
+    String queryString = buildQueryString();
+
+    logger.debug("Query Sent to Splunk: {}", queryString);
+    // Execute the query
+    searchResults = splunkService.export(queryString, exportArgs);
+    logger.debug("Time to execute query: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    /*
+    Splunk produces poor output from the API.  Of the available choices, CSV 
was the easiest to deal with.  Unfortunately,
+    the data is not consistent, as some fields are quoted, some are not.
+    */
+    this.csvReader = new CsvParser(csvSettings);
+    logger.debug("Time to open CSV Parser: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    csvReader.beginParsing(searchResults, "utf-8");
+    logger.debug("Time to open input stream: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+
+    // Build the Schema
+    builder = new SchemaBuilder();
+    TupleMetadata drillSchema = buildSchema();
+    negotiator.tableSchema(drillSchema, false);
+    ResultSetLoader resultLoader = negotiator.build();
+
+    // Create ScalarWriters
+    rowWriter = resultLoader.writer();
+    populateWriterArray();
+    logger.debug("Completed open function in {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processRow()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    timer.stop();
+    if (searchResults != null) {
+      AutoCloseables.closeSilently(searchResults);
+      searchResults = null;
+    }
+  }
+
+  /**
+   * Splunk returns the data in CSV format with some fields escaped and some 
not.  Splunk does
+   * not have the concept of datatypes, or at least does not make the metadata 
available in the API, so
+   * the best solution is to provide a list of columns that are known to be a 
specific data type such as _time,
+   * indextime, the various date components etc and map those as the 
appropriate columns.  Then map everything else as a string.
+   */
+  private TupleMetadata buildSchema() {
+
+    this.firstRow = csvReader.parseNext();
+
+    // Case for empty dataset
+    if (firstRow == null) {
+      return builder.buildSchema();
+    }
+
+    // Parse the first row
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.INT);
+      } else if (TS_COLS.contains(value)) {
+        builder.addNullable(value, MinorType.TIMESTAMP);
+      } else {
+        try {
+          builder.addNullable(value, MinorType.VARCHAR);
+        } catch (Exception e) {
+          logger.warn("Splunk attempted to add duplicate column {}", value);
+        }
+      }
+    }
+    logger.debug("Time to build schmea: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    return builder.buildSchema();
+  }
+
+  private void populateWriterArray() {
+    // Case for empty result set
+    if (firstRow == null || firstRow.length == 0) {
+      return;
+    }
+    columnWriters = new ArrayList<>();
+
+    int colPosition = 0;
+    for (String value : firstRow) {
+      if (INT_COLS.contains(value)) {
+        columnWriters.add(new IntColumnWriter(value, rowWriter, colPosition));
+      } else if (TS_COLS.contains(value)) {
+        columnWriters.add(new TimestampColumnWriter(value, rowWriter, 
colPosition));
+      } else {
+        columnWriters.add(new StringColumnWriter(value, rowWriter, 
colPosition));
+      }
+      colPosition++;
+    }
+    logger.debug("Time to populate writer array: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+  }
+
+  private boolean processRow() {
+    String[] nextRow = csvReader.parseNext();
+    if (nextRow == null) {
+      return false;
+    }
+    rowWriter.start();
+    for (SplunkColumnWriter columnWriter : columnWriters) {
+      columnWriter.load(nextRow);
+    }
+    rowWriter.save();
+    return true;
+  }
+
+  /**
+   * Checks to see whether the query is a star query. For our purposes, the 
star query is
+   * anything that contains only the ** and the SPECIAL_COLUMNS which are not 
projected.
+   * @return true if it is a star query, false if not.
+   */
+  private boolean isStarQuery() {
+    List<SplunkUtils.SPECIAL_FIELDS> specialFields = 
Arrays.asList(SplunkUtils.SPECIAL_FIELDS.values());
+
+    for (SchemaPath path: projectedColumns) {

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to