This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d6a95873 DRILL-8504: Add Schema Caching to Splunk Plugin (#2929)
70d6a95873 is described below

commit 70d6a95873e919ffb27880863df9c404c24d55a1
Author: Charles S. Givre <cgi...@apache.org>
AuthorDate: Mon Nov 18 07:45:25 2024 -0500

    DRILL-8504: Add Schema Caching to Splunk Plugin (#2929)
---
 .github/workflows/ci.yml                           |  3 +-
 contrib/storage-splunk/README.md                   | 19 ++++--
 contrib/storage-splunk/pom.xml                     |  6 ++
 .../drill/exec/store/splunk/SplunkBatchReader.java | 19 +++---
 .../drill/exec/store/splunk/SplunkGroupScan.java   | 58 +++++++++++++++++
 .../exec/store/splunk/SplunkPluginConfig.java      | 72 +++++++++++++++++-----
 .../drill/exec/store/splunk/SplunkSchema.java      | 63 +++++++++++++++++--
 .../drill/exec/store/splunk/SplunkSubScan.java     |  3 +-
 .../drill/exec/store/splunk/SplunkUtils.java       |  3 +
 .../exec/store/splunk/SplunkConnectionTest.java    |  9 ++-
 .../drill/exec/store/splunk/SplunkIndexesTest.java |  9 ++-
 .../exec/store/splunk/SplunkLimitPushDownTest.java |  1 +
 .../drill/exec/store/splunk/SplunkPluginTest.java  |  9 ++-
 .../drill/exec/store/splunk/SplunkTestSuite.java   | 17 ++++-
 .../drill/exec/store/splunk/SplunkWriterTest.java  |  4 +-
 metastore/iceberg-metastore/pom.xml                |  4 --
 pom.xml                                            |  1 +
 17 files changed, 241 insertions(+), 59 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 39c3fc0aa9..095fb44343 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -55,7 +55,7 @@ jobs:
         # added to the Runner and memory hungry tests are run separately.
         run: |
           sudo sh -c "
-            fallocate -l 4G /tmp/swapfile
+            fallocate -l 2G /tmp/swapfile
             chmod 0600 /tmp/swapfile
             mkswap /tmp/swapfile
             swapon /tmp/swapfile
@@ -67,6 +67,7 @@ jobs:
       - name: Remove swap space
         run : |
           sudo sh -c "
+            free -h
             swapoff /tmp/swapfile
             rm /tmp/swapfile
           "
diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md
index 08e3d65de8..188c7247d8 100644
--- a/contrib/storage-splunk/README.md
+++ b/contrib/storage-splunk/README.md
@@ -4,7 +4,7 @@ This plugin enables Drill to query Splunk.
 ## Configuration
 
 | Option                | Default   | Description                              
                       | Since |
-|-----------------------| --------- | 
--------------------------------------------------------------- | ----- |
+|-----------------------| --------- | 
--------------------------------------------------------------- |-------|
 | type                  | (none)    | Set to "splunk" to use this plugin       
                       | 1.19  |
 | username              | null      | Splunk username to be used by Drill      
                       | 1.19  |
 | password              | null      | Splunk password to be used by Drill      
                       | 1.19  |
@@ -13,12 +13,15 @@ This plugin enables Drill to query Splunk.
 | port                  | 8089      | TCP port over which Drill will connect 
to Splunk                | 1.19  |
 | earliestTime          | null      | Global earliest record timestamp default 
                       | 1.19  |
 | latestTime            | null      | Global latest record timestamp default   
                       | 1.19  |
-| app                   | null      | The application context of the 
service[^1]                      | 2.0   |
-| owner                 | null      | The owner context of the service[^1]     
                       | 2.0   |
-| token                 | null      | A Splunk authentication token to use for 
the session[^2]        | 2.0   |
-| cookie                | null      | A valid login cookie                     
                       | 2.0   |
-| validateCertificates  | true      | Whether the Splunk client will validates 
the server's SSL cert  | 2.0   |
+| app                   | null      | The application context of the 
service[^1]                      | 1.21  |
+| owner                 | null      | The owner context of the service[^1]     
                       | 1.21  |
+| token                 | null      | A Splunk authentication token to use for 
the session[^2]        | 1.21  |
+| cookie                | null      | A valid login cookie                     
                       | 1.21  |
+| validateCertificates  | true      | Whether the Splunk client will validates 
the server's SSL cert  | 1.21  |
 | validateHostname      | true      | Whether the Splunk client will validate 
the server's host name  | 1.22  |
+| maxColumns            | 1024      | The maximum number of columns Drill will 
accept from Splunk     | 1.22  |
+| maxCacheSize          | 10000     | The size (in bytes) of Splunk's schema 
cache.                   | 1.22  |
+| cacheExpiration       | 1024      | The number of minutes for Drill to 
persist the schema cache.    | 1.22  |
 
 [^1]: See [this Splunk 
documentation](https://docs.splunk.com/Documentation/Splunk/latest/Admin/Apparchitectureandobjectownership)
 for more information.
 [^2]: See [this Splunk 
documentation](https://docs.splunk.com/Documentation/Splunk/latest/Security/CreateAuthTokens)
 for more information.
@@ -46,6 +49,10 @@ To bypass it by Drill please specify "reconnectRetries": 3. 
It allows you to ret
 ### User Translation
 The Splunk plugin supports user translation.  Simply set the `authMode` 
parameter to `USER_TRANSLATION` and use either the plain or vault credential 
provider for credentials.
 
+## Schema Caching
+For every query that you send to Splunk from Drill, Drill will have to pull 
schema information from Splunk.  If you have a lot of indexes, this process can 
cause slow planning time.  To improve planning time, you can configure Drill to 
cache the index names so that it does not need to make additional calls to 
Splunk.
+
+There are two configuration parameters for the schema caching: `maxCacheSize` 
and `cacheExpiration`.  The maxCacheSize defaults to 10k bytes and the 
`cacheExpiration` defaults to 1024 minutes.  To disable schema caching simply 
set the `cacheExpiration` parameter to a value less than zero.
 
 ## Understanding Splunk's Data Model
 Splunk's primary use case is analyzing event logs with a timestamp. As such, 
data is indexed by the timestamp, with the most recent data being indexed 
first.  By default, Splunk
diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml
index 9a68a6ffa0..165828aba0 100644
--- a/contrib/storage-splunk/pom.xml
+++ b/contrib/storage-splunk/pom.xml
@@ -58,6 +58,12 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+      <version>${caffeine.version}</version>
+    </dependency>
+
     <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
diff --git 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
index 8c3fb45bf6..a02f5d0927 100644
--- 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
+++ 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
@@ -54,9 +54,6 @@ public class SplunkBatchReader implements 
ManagedReader<SchemaNegotiator> {
   private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchReader.class);
   private static final List<String> INT_COLS = new 
ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", 
"date_second", "date_year", "linecount"));
   private static final List<String> TS_COLS = new 
ArrayList<>(Arrays.asList("_indextime", "_time"));
-  private static final String EARLIEST_TIME_COLUMN = "earliestTime";
-  private static final String LATEST_TIME_COLUMN = "latestTime";
-
   private final SplunkPluginConfig config;
   private final SplunkSubScan subScan;
   private final List<SchemaPath> projectedColumns;
@@ -88,6 +85,8 @@ public class SplunkBatchReader implements 
ManagedReader<SchemaNegotiator> {
     RowListProcessor rowProcessor = new RowListProcessor();
     csvSettings.setProcessor(rowProcessor);
     csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE);
+    // Splunk can produce a lot of columns. The SDK default maximum is 512.
+    csvSettings.setMaxColumns(config.getMaxColumns());
   }
 
   @Override
@@ -174,7 +173,7 @@ public class SplunkBatchReader implements 
ManagedReader<SchemaNegotiator> {
         }
       }
     }
-    logger.debug("Time to build schmea: {} milliseconds", 
timer.elapsed().getNano() / 100000);
+    logger.debug("Time to build schema: {} milliseconds", 
timer.elapsed().getNano() / 100000);
     return builder.buildSchema();
   }
 
@@ -241,18 +240,18 @@ public class SplunkBatchReader implements 
ManagedReader<SchemaNegotiator> {
 
     // Splunk searches perform best when they are time bound.  This allows the 
user to set
     // default time boundaries in the config.  These will be overwritten in 
filter pushdowns
-    if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) {
-      earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString();
+    if (filters != null && 
filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) {
+      earliestTime = 
filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString();
 
       // Remove from map
-      filters.remove(EARLIEST_TIME_COLUMN);
+      filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN);
     }
 
-    if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) {
-      latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString();
+    if (filters != null && 
filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) {
+      latestTime = 
filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString();
 
       // Remove from map so they are not pushed down into the query
-      filters.remove(LATEST_TIME_COLUMN);
+      filters.remove(SplunkUtils.LATEST_TIME_COLUMN);
     }
 
     if (earliestTime == null) {
diff --git 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
index 9ab7de6d47..53596a2b54 100644
--- 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
+++ 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
@@ -309,6 +309,63 @@ public class SplunkGroupScan extends AbstractGroupScan {
     return new SplunkGroupScan(this, columns);
   }
 
+  /**
+   * Generates the query which will be sent to Splunk. This method exists for 
debugging purposes so
+   * that the actual SPL will be recorded in the query plan.
+   */
+  private String generateQuery() {
+    String earliestTime = null;
+    String latestTime = null;
+
+    // Splunk searches perform best when they are time bound.  This allows the 
user to set
+    // default time boundaries in the config.  These will be overwritten in 
filter pushdowns
+    if (filters != null && 
filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) {
+      earliestTime = 
filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString();
+    }
+
+    if (filters != null && 
filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) {
+      latestTime = 
filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString();
+    }
+
+    if (earliestTime == null) {
+      earliestTime = config.getEarliestTime();
+    }
+
+    if (latestTime == null) {
+      latestTime = config.getLatestTime();
+    }
+
+    // Special case: If the user wishes to send arbitrary SPL to Splunk, the 
user can use the "SPL"
+    // Index and spl filter
+    if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) {
+      if (filters != null && filters.containsKey("spl")) {
+        return filters.get("spl").value.value.toString();
+      }
+    }
+
+    SplunkQueryBuilder builder = new 
SplunkQueryBuilder(splunkScanSpec.getIndexName());
+
+    // Set the sourcetype
+    if (filters != null && filters.containsKey("sourcetype")) {
+      String sourcetype = filters.get("sourcetype").value.value.toString();
+      builder.addSourceType(sourcetype);
+    }
+
+    // Add projected columns, skipping star and specials.
+    for (SchemaPath projectedColumn: columns) {
+      builder.addField(projectedColumn.getAsUnescapedPath());
+    }
+
+    // Apply filters
+    builder.addFilters(filters);
+
+    // Apply limits
+    if (maxRecords > 0) {
+      builder.addLimit(maxRecords);
+    }
+    return builder.build();
+  }
+
   @Override
   public int hashCode() {
 
@@ -344,6 +401,7 @@ public class SplunkGroupScan extends AbstractGroupScan {
       .field("scan spec", splunkScanSpec)
       .field("columns", columns)
       .field("maxRecords", maxRecords)
+      .field("spl", generateQuery())
       .toString();
   }
 }
diff --git 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index 7845abeac5..15f8b0d579 100644
--- 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++ 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -40,6 +40,9 @@ public class SplunkPluginConfig extends StoragePluginConfig {
   public static final String NAME = "splunk";
   public static final int DISABLED_RECONNECT_RETRIES = 1;
   public static final int DEFAULT_WRITER_BATCH_SIZE = 1000;
+  public static final int DEFAULT_MAX_READER_COLUMNS = 1024;
+  public static final int DEFAULT_MAX_CACHE_SIZE = 10000;
+  public static final int DEFAULT_CACHE_EXPIRATION = 1024;
 
   private final String scheme;
   private final String hostname;
@@ -55,6 +58,9 @@ public class SplunkPluginConfig extends StoragePluginConfig {
   private final Integer reconnectRetries;
   private final boolean writable;
   private final Integer writerBatchSize;
+  private final Integer maxColumns;
+  private final Integer maxCacheSize;
+  private final Integer cacheExpiration;
 
   @JsonCreator
   public SplunkPluginConfig(@JsonProperty("username") String username,
@@ -74,7 +80,11 @@ public class SplunkPluginConfig extends StoragePluginConfig {
                             @JsonProperty("reconnectRetries") Integer 
reconnectRetries,
                             @JsonProperty("authMode") String authMode,
                             @JsonProperty("writable") boolean writable,
-                            @JsonProperty("writableBatchSize") Integer 
writerBatchSize) {
+                            @JsonProperty("writableBatchSize") Integer 
writerBatchSize,
+                            @JsonProperty("maxColumns") Integer maxColumns,
+                            @JsonProperty("maxCacheSize") Integer maxCacheSize,
+                            @JsonProperty("cacheExpiration") Integer 
cacheExpiration
+      ) {
     super(CredentialProviderUtils.getCredentialsProvider(username, password, 
credentialsProvider),
         credentialsProvider == null, AuthMode.parseOrDefault(authMode, 
AuthMode.SHARED_USER));
     this.scheme = scheme;
@@ -91,6 +101,9 @@ public class SplunkPluginConfig extends StoragePluginConfig {
     this.latestTime = latestTime == null ? "now" : latestTime;
     this.reconnectRetries = reconnectRetries;
     this.writerBatchSize = writerBatchSize;
+    this.maxColumns = maxColumns;
+    this.maxCacheSize = maxCacheSize;
+    this.cacheExpiration = cacheExpiration;
   }
 
   private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider 
credentialsProvider) {
@@ -109,6 +122,9 @@ public class SplunkPluginConfig extends StoragePluginConfig 
{
     this.latestTime = that.latestTime;
     this.reconnectRetries = that.reconnectRetries;
     this.writerBatchSize = that.writerBatchSize;
+    this.maxColumns = that.maxColumns;
+    this.maxCacheSize = that.maxCacheSize;
+    this.cacheExpiration = that.cacheExpiration;
   }
 
   /**
@@ -225,6 +241,21 @@ public class SplunkPluginConfig extends 
StoragePluginConfig {
     return writerBatchSize != null ? writerBatchSize : 
DEFAULT_WRITER_BATCH_SIZE;
   }
 
+  @JsonProperty("maxColumns")
+  public int getMaxColumns() {
+    return maxColumns != null ? maxColumns : DEFAULT_MAX_READER_COLUMNS;
+  }
+
+  @JsonProperty("maxCacheSize")
+  public int getMaxCacheSize() {
+    return maxCacheSize != null ? maxCacheSize : DEFAULT_MAX_CACHE_SIZE;
+  }
+
+  @JsonProperty("cacheExpiration")
+  public int getCacheExpiration() {
+    return cacheExpiration != null ? cacheExpiration : 
DEFAULT_CACHE_EXPIRATION;
+  }
+
   private static CredentialsProvider 
getCredentialsProvider(CredentialsProvider credentialsProvider) {
     return credentialsProvider != null ? credentialsProvider : 
PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER;
   }
@@ -250,26 +281,32 @@ public class SplunkPluginConfig extends 
StoragePluginConfig {
       Objects.equals(validateHostname, thatConfig.validateHostname) &&
       Objects.equals(earliestTime, thatConfig.earliestTime) &&
       Objects.equals(latestTime, thatConfig.latestTime) &&
-      Objects.equals(authMode, thatConfig.authMode);
+      Objects.equals(authMode, thatConfig.authMode) &&
+        Objects.equals(maxCacheSize, thatConfig.maxCacheSize) &&
+        Objects.equals(maxColumns, thatConfig.maxColumns) &&
+        Objects.equals(cacheExpiration, thatConfig.cacheExpiration);
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(
-      credentialsProvider,
-      scheme,
-      hostname,
-      port,
-      app,
-      owner,
-      token,
-      cookie,
-      writable,
-      validateCertificates,
-      validateHostname,
-      earliestTime,
-      latestTime,
-      authMode
+        credentialsProvider,
+        scheme,
+        hostname,
+        port,
+        app,
+        owner,
+        token,
+        cookie,
+        writable,
+        validateCertificates,
+        validateHostname,
+        earliestTime,
+        latestTime,
+        authMode,
+        cacheExpiration,
+        maxCacheSize,
+        maxColumns
     );
   }
 
@@ -290,6 +327,9 @@ public class SplunkPluginConfig extends StoragePluginConfig 
{
       .field("earliestTime", earliestTime)
       .field("latestTime", latestTime)
       .field("Authentication Mode", authMode)
+      .field("maxColumns", maxColumns)
+      .field("maxCacheSize", maxCacheSize)
+      .field("cacheExpiration", cacheExpiration)
       .toString();
   }
 
diff --git 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java
 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java
index c8cecab5d0..0e2760594d 100644
--- 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java
+++ 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java
@@ -18,6 +18,9 @@
 
 package org.apache.drill.exec.store.splunk;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.collect.Sets;
 import com.splunk.IndexCollection;
 import org.apache.calcite.schema.Table;
 import org.apache.drill.common.exceptions.UserException;
@@ -28,7 +31,6 @@ import 
org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.planner.logical.ModifyTableEntry;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.StorageStrategy;
-import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,19 +40,32 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 public class SplunkSchema extends AbstractSchema {
-  private final static Logger logger = 
LoggerFactory.getLogger(SplunkSchema.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkSchema.class);
   private static final String SPL_TABLE_NAME = "spl";
   private final Map<String, DynamicDrillTable> activeTables = new HashMap<>();
   private final SplunkStoragePlugin plugin;
   private final String queryUserName;
+  private final Cache<String, Set<String>> cache;
+  private final boolean useCache;
 
   public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) {
     super(Collections.emptyList(), plugin.getName());
     this.plugin = plugin;
     this.queryUserName = queryUserName;
-
+    this.useCache = plugin.getConfig().getCacheExpiration() >= 0;
+
+    if (useCache) {
+      logger.info("Using splunk schema cache for {}", plugin.getName());
+      this.cache = Caffeine.newBuilder()
+          .expireAfterAccess(plugin.getConfig().getCacheExpiration(), 
TimeUnit.MINUTES)
+          .maximumSize(plugin.getConfig().getMaxCacheSize())
+          .build();
+    } else {
+      this.cache = null;
+    }
 
     registerIndexes();
   }
@@ -86,14 +101,19 @@ public class SplunkSchema extends AbstractSchema {
   }
 
   @Override
-  public CreateTableEntry createNewTable(String tableName, List<String> 
partitionColumns,
-    StorageStrategy strategy) {
+  public CreateTableEntry createNewTable(String tableName,
+      List<String> partitionColumns,
+      StorageStrategy strategy) {
     if (plugin.getConfig().isWritable() == null || (! 
plugin.getConfig().isWritable())) {
       throw UserException
         .dataWriteError()
         .message(plugin.getName() + " is not writable.")
         .build(logger);
     }
+    // Clear the index cache.
+    if (useCache) {
+      cache.invalidate(getNameForCache());
+    }
 
     return new CreateTableEntry() {
       @Override
@@ -122,6 +142,13 @@ public class SplunkSchema extends AbstractSchema {
 
     // Drop the index
     indexes.remove(indexName);
+
+    if (useCache) {
+      // Update the cache
+      String cacheKey = getNameForCache();
+      cache.invalidate(cacheKey);
+      cache.put(cacheKey, indexes.keySet());
+    }
   }
 
   @Override
@@ -139,6 +166,14 @@ public class SplunkSchema extends AbstractSchema {
     return SplunkPluginConfig.NAME;
   }
 
+  /**
+   * Returns the name for the cache.
+   * @return A String containing a combination of the queryUsername and 
sourceType (table name)
+   */
+  private String getNameForCache() {
+    return queryUserName + "-" + plugin.getName();
+  }
+
   private void registerIndexes() {
     // Verify that the connection is successful.  If not, don't register any 
indexes,
     // and throw an exception.
@@ -148,8 +183,24 @@ public class SplunkSchema extends AbstractSchema {
     registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, 
plugin.getName(),
       new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), 
queryUserName)));
 
+    Set<String> indexList = null;
     // Retrieve and add all other Splunk indexes
-    for (String indexName : connection.getIndexes().keySet()) {
+    // First check the cache to see if we have a list of indexes.
+    String nameKey = getNameForCache();
+    if (useCache) {
+      indexList = cache.getIfPresent(nameKey);
+    }
+
+    // If the index list is not in the cache, query Splunk, retrieve the index 
list and add it to the cache.
+    if (indexList == null) {
+      logger.debug("Index list not in Splunk schema cache.  Retrieving from 
Splunk.");
+      indexList = connection.getIndexes().keySet();
+      if (useCache) {
+        cache.put(nameKey, indexList);
+      }
+    }
+
+    for (String indexName : indexList) {
       logger.debug("Registering {}", indexName);
       registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(),
         new SplunkScanSpec(plugin.getName(), indexName, plugin.getConfig(), 
queryUserName)));
diff --git 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
index 951e05bb17..37c9fd2d54 100644
--- 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
+++ 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.collect.ImmutableSet;
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.AbstractBase;
@@ -29,7 +30,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.PhysicalVisitor;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.store.base.filter.ExprNode;
-import com.google.common.collect.ImmutableSet;
 
 import java.util.Iterator;
 import java.util.List;
@@ -38,7 +38,6 @@ import java.util.Objects;
 
 @JsonTypeName("splunk-sub-scan")
 public class SplunkSubScan extends AbstractBase implements SubScan {
-
   private final SplunkPluginConfig config;
   private final SplunkScanSpec splunkScanSpec;
   private final List<SchemaPath> columns;
diff --git 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java
 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java
index 4b856182f5..e0471436fc 100644
--- 
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java
+++ 
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java
@@ -19,6 +19,9 @@
 package org.apache.drill.exec.store.splunk;
 
 public class SplunkUtils {
+  public static final String EARLIEST_TIME_COLUMN = "earliestTime";
+  public static final String LATEST_TIME_COLUMN = "latestTime";
+
   /**
    * These are special fields that alter the queries sent to Splunk.
    */
diff --git 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index f4884ece45..5636249433 100644
--- 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++ 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -59,7 +59,7 @@ public class SplunkConnectionTest extends SplunkBaseTest {
         SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(),
         null,
         SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
-        StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null
+        StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null, null, 
null, null
       );
       SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null);
       sc.connect();
@@ -73,11 +73,14 @@ public class SplunkConnectionTest extends SplunkBaseTest {
   public void testGetIndexes() {
     SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, 
null);
     EntityCollection<Index> indexes = sc.getIndexes();
-    assertEquals(10, indexes.size());
+    assertEquals(13, indexes.size());
 
     List<String> expectedIndexNames = new ArrayList<>();
     expectedIndexNames.add("_audit");
     expectedIndexNames.add("_configtracker");
+    expectedIndexNames.add("_dsappevent");
+    expectedIndexNames.add("_dsclient");
+    expectedIndexNames.add("_dsphonehome");
     expectedIndexNames.add("_internal");
     expectedIndexNames.add("_introspection");
     expectedIndexNames.add("_telemetry");
@@ -92,7 +95,7 @@ public class SplunkConnectionTest extends SplunkBaseTest {
       indexNames.add(index.getName());
     }
 
-    assertEquals(indexNames, expectedIndexNames);
+    assertEquals(expectedIndexNames, indexNames);
 
   }
 }
diff --git 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
index fcdce8d778..470f3960cb 100644
--- 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
+++ 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java
@@ -43,14 +43,17 @@ public class SplunkIndexesTest extends SplunkBaseTest {
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
       .addRow("splunk", "summary")
       .addRow("splunk", "splunklogger")
+      .addRow("splunk", "_dsclient")
       .addRow("splunk", "_configtracker")
       .addRow("splunk", "_thefishbucket")
-      .addRow("splunk", "_audit")
-      .addRow("splunk", "_internal")
-      .addRow("splunk", "_introspection")
       .addRow("splunk", "main")
       .addRow("splunk", "history")
+      .addRow("splunk", "_dsphonehome")
       .addRow("splunk", "spl")
+      .addRow("splunk", "_audit")
+      .addRow("splunk", "_internal")
+      .addRow("splunk", "_dsappevent")
+      .addRow("splunk", "_introspection")
       .addRow("splunk", "_telemetry")
       .build();
 
diff --git 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java
 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java
index 813c665239..f5200b98c8 100644
--- 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java
+++ 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java
@@ -64,6 +64,7 @@ public class SplunkLimitPushDownTest extends SplunkBaseTest {
       .sql(sql)
       .planMatcher()
       .include("Limit", "maxRecords=4")
+      .include("spl", "search index=_audit rating=52.17 | fields rating | head 
5 | table rating")
       .match();
   }
 }
diff --git 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index 2109de28de..ebbff43b5b 100644
--- 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++ 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -77,14 +77,17 @@ public class SplunkPluginTest extends SplunkBaseTest {
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
       .addRow("splunk", "summary")
       .addRow("splunk", "splunklogger")
+      .addRow("splunk", "_dsclient")
       .addRow("splunk", "_configtracker")
       .addRow("splunk", "_thefishbucket")
-      .addRow("splunk", "_audit")
-      .addRow("splunk", "_internal")
-      .addRow("splunk", "_introspection")
       .addRow("splunk", "main")
       .addRow("splunk", "history")
+      .addRow("splunk", "_dsphonehome")
       .addRow("splunk", "spl")
+      .addRow("splunk", "_audit")
+      .addRow("splunk", "_internal")
+      .addRow("splunk", "_dsappevent")
+      .addRow("splunk", "_introspection")
       .addRow("splunk", "_telemetry")
       .build();
 
diff --git 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 5cd228d70e..dc434c8f06 100644
--- 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++ 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -70,7 +70,7 @@ public class SplunkTestSuite extends ClusterTest {
   private static AtomicInteger initCount = new AtomicInteger(0);
   @ClassRule
   public static GenericContainer<?> splunk = new GenericContainer<>(
-    DockerImageName.parse("splunk/splunk:9.0.2")
+    DockerImageName.parse("splunk/splunk:9.3")
   )
     .withExposedPorts(8089, 8089)
     .withEnv("SPLUNK_START_ARGS", "--accept-license")
@@ -88,6 +88,17 @@ public class SplunkTestSuite extends ClusterTest {
         startCluster(builder);
 
         splunk.start();
+        splunk.execInContainer("if ! sudo grep -q 'minFileSize' 
/opt/splunk/etc/system/local/server.conf; then " +
+            "sudo chmod a+w /opt/splunk/etc/system/local/server.conf; " +
+            "sudo echo \"# disk usage processor settings\" >> 
/opt/splunk/etc/system/local/server.conf; " +
+            "sudo echo \"[diskUsage]\" >> 
/opt/splunk/etc/system/local/server.conf; " +
+            "sudo echo \"minFreeSpace = 2000\" >> 
/opt/splunk/etc/system/local/server.conf; " +
+            "sudo echo \"pollingFrequency = 100000\" >> 
/opt/splunk/etc/system/local/server.conf; " +
+            "sudo echo \"pollingTimerFrequency = 10\" >> 
/opt/splunk/etc/system/local/server.conf; " +
+            "sudo chmod 600 /opt/splunk/etc/system/local/server.conf; " +
+            "sudo /opt/splunk/bin/splunk restart; " +
+            "fi");
+
         String hostname = splunk.getHost();
         Integer port = splunk.getFirstMappedPort();
         StoragePluginRegistry pluginRegistry = 
cluster.drillbit().getContext().getStorage();
@@ -98,7 +109,7 @@ public class SplunkTestSuite extends ClusterTest {
           "1", "now",
           null,
           4,
-          StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null
+          StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null, null, 
null, null
         );
         SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true);
         pluginRegistry.put(SplunkPluginConfig.NAME, 
SPLUNK_STORAGE_PLUGIN_CONFIG);
@@ -120,7 +131,7 @@ public class SplunkTestSuite extends ClusterTest {
           "1", "now",
           credentialsProvider,
           4,
-          AuthMode.USER_TRANSLATION.name(), true, null
+          AuthMode.USER_TRANSLATION.name(), true, null, null, null, null
         );
         SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true);
         pluginRegistry.put("ut_splunk", 
SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION);
diff --git 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java
 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java
index f5d399cd23..239a7930db 100644
--- 
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java
+++ 
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java
@@ -76,10 +76,10 @@ public class SplunkWriterTest extends SplunkBaseTest {
       .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-      .addRow("198.35.2.120", "ACCESSORIES")
+      .addRow("198.35.2.120", "STRATEGY")
       .addRow("198.35.2.120", null)
       .addRow("198.35.2.120", null)
-      .addRow("198.35.2.120", "STRATEGY")
+      .addRow("198.35.2.120", "ACCESSORIES")
       .addRow("198.35.2.120", "NULL")
       .build();
     RowSetUtilities.verify(expected, results);
diff --git a/metastore/iceberg-metastore/pom.xml 
b/metastore/iceberg-metastore/pom.xml
index 15a337121f..fe28d07bec 100644
--- a/metastore/iceberg-metastore/pom.xml
+++ b/metastore/iceberg-metastore/pom.xml
@@ -30,10 +30,6 @@
   <artifactId>drill-iceberg-metastore</artifactId>
   <name>Drill : Metastore : Iceberg</name>
 
-  <properties>
-    <caffeine.version>2.7.0</caffeine.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.drill</groupId>
diff --git a/pom.xml b/pom.xml
index d7ef0d0925..1b5693b34e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
     <avatica.version>1.23.0</avatica.version>
     <avro.version>1.11.4</avro.version>
     <bouncycastle.version>1.78.1</bouncycastle.version>
+    <caffeine.version>2.9.3</caffeine.version>
     <calcite.groupId>org.apache.calcite</calcite.groupId>
     <calcite.version>1.34.0</calcite.version>
     <codemodel.version>2.6</codemodel.version>


Reply via email to