This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 70d6a95873 DRILL-8504: Add Schema Caching to Splunk Plugin (#2929) 70d6a95873 is described below commit 70d6a95873e919ffb27880863df9c404c24d55a1 Author: Charles S. Givre <cgi...@apache.org> AuthorDate: Mon Nov 18 07:45:25 2024 -0500 DRILL-8504: Add Schema Caching to Splunk Plugin (#2929) --- .github/workflows/ci.yml | 3 +- contrib/storage-splunk/README.md | 19 ++++-- contrib/storage-splunk/pom.xml | 6 ++ .../drill/exec/store/splunk/SplunkBatchReader.java | 19 +++--- .../drill/exec/store/splunk/SplunkGroupScan.java | 58 +++++++++++++++++ .../exec/store/splunk/SplunkPluginConfig.java | 72 +++++++++++++++++----- .../drill/exec/store/splunk/SplunkSchema.java | 63 +++++++++++++++++-- .../drill/exec/store/splunk/SplunkSubScan.java | 3 +- .../drill/exec/store/splunk/SplunkUtils.java | 3 + .../exec/store/splunk/SplunkConnectionTest.java | 9 ++- .../drill/exec/store/splunk/SplunkIndexesTest.java | 9 ++- .../exec/store/splunk/SplunkLimitPushDownTest.java | 1 + .../drill/exec/store/splunk/SplunkPluginTest.java | 9 ++- .../drill/exec/store/splunk/SplunkTestSuite.java | 17 ++++- .../drill/exec/store/splunk/SplunkWriterTest.java | 4 +- metastore/iceberg-metastore/pom.xml | 4 -- pom.xml | 1 + 17 files changed, 241 insertions(+), 59 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 39c3fc0aa9..095fb44343 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,7 +55,7 @@ jobs: # added to the Runner and memory hungry tests are run separately. run: | sudo sh -c " - fallocate -l 4G /tmp/swapfile + fallocate -l 2G /tmp/swapfile chmod 0600 /tmp/swapfile mkswap /tmp/swapfile swapon /tmp/swapfile @@ -67,6 +67,7 @@ jobs: - name: Remove swap space run : | sudo sh -c " + free -h swapoff /tmp/swapfile rm /tmp/swapfile " diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md index 08e3d65de8..188c7247d8 100644 --- a/contrib/storage-splunk/README.md +++ b/contrib/storage-splunk/README.md @@ -4,7 +4,7 @@ This plugin enables Drill to query Splunk. ## Configuration | Option | Default | Description | Since | -|-----------------------| --------- | --------------------------------------------------------------- | ----- | +|-----------------------| --------- | --------------------------------------------------------------- |-------| | type | (none) | Set to "splunk" to use this plugin | 1.19 | | username | null | Splunk username to be used by Drill | 1.19 | | password | null | Splunk password to be used by Drill | 1.19 | @@ -13,12 +13,15 @@ This plugin enables Drill to query Splunk. | port | 8089 | TCP port over which Drill will connect to Splunk | 1.19 | | earliestTime | null | Global earliest record timestamp default | 1.19 | | latestTime | null | Global latest record timestamp default | 1.19 | -| app | null | The application context of the service[^1] | 2.0 | -| owner | null | The owner context of the service[^1] | 2.0 | -| token | null | A Splunk authentication token to use for the session[^2] | 2.0 | -| cookie | null | A valid login cookie | 2.0 | -| validateCertificates | true | Whether the Splunk client will validates the server's SSL cert | 2.0 | +| app | null | The application context of the service[^1] | 1.21 | +| owner | null | The owner context of the service[^1] | 1.21 | +| token | null | A Splunk authentication token to use for the session[^2] | 1.21 | +| cookie | null | A valid login cookie | 1.21 | +| validateCertificates | true | Whether the Splunk client will validates the server's SSL cert | 1.21 | | validateHostname | true | Whether the Splunk client will validate the server's host name | 1.22 | +| maxColumns | 1024 | The maximum number of columns Drill will accept from Splunk | 1.22 | +| maxCacheSize | 10000 | The size (in bytes) of Splunk's schema cache. | 1.22 | +| cacheExpiration | 1024 | The number of minutes for Drill to persist the schema cache. | 1.22 | [^1]: See [this Splunk documentation](https://docs.splunk.com/Documentation/Splunk/latest/Admin/Apparchitectureandobjectownership) for more information. [^2]: See [this Splunk documentation](https://docs.splunk.com/Documentation/Splunk/latest/Security/CreateAuthTokens) for more information. @@ -46,6 +49,10 @@ To bypass it by Drill please specify "reconnectRetries": 3. It allows you to ret ### User Translation The Splunk plugin supports user translation. Simply set the `authMode` parameter to `USER_TRANSLATION` and use either the plain or vault credential provider for credentials. +## Schema Caching +For every query that you send to Splunk from Drill, Drill will have to pull schema information from Splunk. If you have a lot of indexes, this process can cause slow planning time. To improve planning time, you can configure Drill to cache the index names so that it does not need to make additional calls to Splunk. + +There are two configuration parameters for the schema caching: `maxCacheSize` and `cacheExpiration`. The maxCacheSize defaults to 10k bytes and the `cacheExpiration` defaults to 1024 minutes. To disable schema caching simply set the `cacheExpiration` parameter to a value less than zero. ## Understanding Splunk's Data Model Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first. By default, Splunk diff --git a/contrib/storage-splunk/pom.xml b/contrib/storage-splunk/pom.xml index 9a68a6ffa0..165828aba0 100644 --- a/contrib/storage-splunk/pom.xml +++ b/contrib/storage-splunk/pom.xml @@ -58,6 +58,12 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>${caffeine.version}</version> + </dependency> + <!-- Test dependencies --> <dependency> <groupId>org.apache.drill.exec</groupId> diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java index 8c3fb45bf6..a02f5d0927 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java @@ -54,9 +54,6 @@ public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> { private static final Logger logger = LoggerFactory.getLogger(SplunkBatchReader.class); private static final List<String> INT_COLS = new ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", "date_second", "date_year", "linecount")); private static final List<String> TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time")); - private static final String EARLIEST_TIME_COLUMN = "earliestTime"; - private static final String LATEST_TIME_COLUMN = "latestTime"; - private final SplunkPluginConfig config; private final SplunkSubScan subScan; private final List<SchemaPath> projectedColumns; @@ -88,6 +85,8 @@ public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> { RowListProcessor rowProcessor = new RowListProcessor(); csvSettings.setProcessor(rowProcessor); csvSettings.setMaxCharsPerColumn(ValueVector.MAX_BUFFER_SIZE); + // Splunk can produce a lot of columns. The SDK default maximum is 512. + csvSettings.setMaxColumns(config.getMaxColumns()); } @Override @@ -174,7 +173,7 @@ public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> { } } } - logger.debug("Time to build schmea: {} milliseconds", timer.elapsed().getNano() / 100000); + logger.debug("Time to build schema: {} milliseconds", timer.elapsed().getNano() / 100000); return builder.buildSchema(); } @@ -241,18 +240,18 @@ public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> { // Splunk searches perform best when they are time bound. This allows the user to set // default time boundaries in the config. These will be overwritten in filter pushdowns - if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) { - earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString(); + if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) { + earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString(); // Remove from map - filters.remove(EARLIEST_TIME_COLUMN); + filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN); } - if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) { - latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString(); + if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) { + latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString(); // Remove from map so they are not pushed down into the query - filters.remove(LATEST_TIME_COLUMN); + filters.remove(SplunkUtils.LATEST_TIME_COLUMN); } if (earliestTime == null) { diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java index 9ab7de6d47..53596a2b54 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java @@ -309,6 +309,63 @@ public class SplunkGroupScan extends AbstractGroupScan { return new SplunkGroupScan(this, columns); } + /** + * Generates the query which will be sent to Splunk. This method exists for debugging purposes so + * that the actual SPL will be recorded in the query plan. + */ + private String generateQuery() { + String earliestTime = null; + String latestTime = null; + + // Splunk searches perform best when they are time bound. This allows the user to set + // default time boundaries in the config. These will be overwritten in filter pushdowns + if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) { + earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString(); + } + + if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) { + latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString(); + } + + if (earliestTime == null) { + earliestTime = config.getEarliestTime(); + } + + if (latestTime == null) { + latestTime = config.getLatestTime(); + } + + // Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL" + // Index and spl filter + if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) { + if (filters != null && filters.containsKey("spl")) { + return filters.get("spl").value.value.toString(); + } + } + + SplunkQueryBuilder builder = new SplunkQueryBuilder(splunkScanSpec.getIndexName()); + + // Set the sourcetype + if (filters != null && filters.containsKey("sourcetype")) { + String sourcetype = filters.get("sourcetype").value.value.toString(); + builder.addSourceType(sourcetype); + } + + // Add projected columns, skipping star and specials. + for (SchemaPath projectedColumn: columns) { + builder.addField(projectedColumn.getAsUnescapedPath()); + } + + // Apply filters + builder.addFilters(filters); + + // Apply limits + if (maxRecords > 0) { + builder.addLimit(maxRecords); + } + return builder.build(); + } + @Override public int hashCode() { @@ -344,6 +401,7 @@ public class SplunkGroupScan extends AbstractGroupScan { .field("scan spec", splunkScanSpec) .field("columns", columns) .field("maxRecords", maxRecords) + .field("spl", generateQuery()) .toString(); } } diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java index 7845abeac5..15f8b0d579 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java @@ -40,6 +40,9 @@ public class SplunkPluginConfig extends StoragePluginConfig { public static final String NAME = "splunk"; public static final int DISABLED_RECONNECT_RETRIES = 1; public static final int DEFAULT_WRITER_BATCH_SIZE = 1000; + public static final int DEFAULT_MAX_READER_COLUMNS = 1024; + public static final int DEFAULT_MAX_CACHE_SIZE = 10000; + public static final int DEFAULT_CACHE_EXPIRATION = 1024; private final String scheme; private final String hostname; @@ -55,6 +58,9 @@ public class SplunkPluginConfig extends StoragePluginConfig { private final Integer reconnectRetries; private final boolean writable; private final Integer writerBatchSize; + private final Integer maxColumns; + private final Integer maxCacheSize; + private final Integer cacheExpiration; @JsonCreator public SplunkPluginConfig(@JsonProperty("username") String username, @@ -74,7 +80,11 @@ public class SplunkPluginConfig extends StoragePluginConfig { @JsonProperty("reconnectRetries") Integer reconnectRetries, @JsonProperty("authMode") String authMode, @JsonProperty("writable") boolean writable, - @JsonProperty("writableBatchSize") Integer writerBatchSize) { + @JsonProperty("writableBatchSize") Integer writerBatchSize, + @JsonProperty("maxColumns") Integer maxColumns, + @JsonProperty("maxCacheSize") Integer maxCacheSize, + @JsonProperty("cacheExpiration") Integer cacheExpiration + ) { super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null, AuthMode.parseOrDefault(authMode, AuthMode.SHARED_USER)); this.scheme = scheme; @@ -91,6 +101,9 @@ public class SplunkPluginConfig extends StoragePluginConfig { this.latestTime = latestTime == null ? "now" : latestTime; this.reconnectRetries = reconnectRetries; this.writerBatchSize = writerBatchSize; + this.maxColumns = maxColumns; + this.maxCacheSize = maxCacheSize; + this.cacheExpiration = cacheExpiration; } private SplunkPluginConfig(SplunkPluginConfig that, CredentialsProvider credentialsProvider) { @@ -109,6 +122,9 @@ public class SplunkPluginConfig extends StoragePluginConfig { this.latestTime = that.latestTime; this.reconnectRetries = that.reconnectRetries; this.writerBatchSize = that.writerBatchSize; + this.maxColumns = that.maxColumns; + this.maxCacheSize = that.maxCacheSize; + this.cacheExpiration = that.cacheExpiration; } /** @@ -225,6 +241,21 @@ public class SplunkPluginConfig extends StoragePluginConfig { return writerBatchSize != null ? writerBatchSize : DEFAULT_WRITER_BATCH_SIZE; } + @JsonProperty("maxColumns") + public int getMaxColumns() { + return maxColumns != null ? maxColumns : DEFAULT_MAX_READER_COLUMNS; + } + + @JsonProperty("maxCacheSize") + public int getMaxCacheSize() { + return maxCacheSize != null ? maxCacheSize : DEFAULT_MAX_CACHE_SIZE; + } + + @JsonProperty("cacheExpiration") + public int getCacheExpiration() { + return cacheExpiration != null ? cacheExpiration : DEFAULT_CACHE_EXPIRATION; + } + private static CredentialsProvider getCredentialsProvider(CredentialsProvider credentialsProvider) { return credentialsProvider != null ? credentialsProvider : PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER; } @@ -250,26 +281,32 @@ public class SplunkPluginConfig extends StoragePluginConfig { Objects.equals(validateHostname, thatConfig.validateHostname) && Objects.equals(earliestTime, thatConfig.earliestTime) && Objects.equals(latestTime, thatConfig.latestTime) && - Objects.equals(authMode, thatConfig.authMode); + Objects.equals(authMode, thatConfig.authMode) && + Objects.equals(maxCacheSize, thatConfig.maxCacheSize) && + Objects.equals(maxColumns, thatConfig.maxColumns) && + Objects.equals(cacheExpiration, thatConfig.cacheExpiration); } @Override public int hashCode() { return Objects.hash( - credentialsProvider, - scheme, - hostname, - port, - app, - owner, - token, - cookie, - writable, - validateCertificates, - validateHostname, - earliestTime, - latestTime, - authMode + credentialsProvider, + scheme, + hostname, + port, + app, + owner, + token, + cookie, + writable, + validateCertificates, + validateHostname, + earliestTime, + latestTime, + authMode, + cacheExpiration, + maxCacheSize, + maxColumns ); } @@ -290,6 +327,9 @@ public class SplunkPluginConfig extends StoragePluginConfig { .field("earliestTime", earliestTime) .field("latestTime", latestTime) .field("Authentication Mode", authMode) + .field("maxColumns", maxColumns) + .field("maxCacheSize", maxCacheSize) + .field("cacheExpiration", cacheExpiration) .toString(); } diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java index c8cecab5d0..0e2760594d 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchema.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.store.splunk; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Sets; import com.splunk.IndexCollection; import org.apache.calcite.schema.Table; import org.apache.drill.common.exceptions.UserException; @@ -28,7 +31,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.planner.logical.ModifyTableEntry; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.StorageStrategy; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,19 +40,32 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; public class SplunkSchema extends AbstractSchema { - private final static Logger logger = LoggerFactory.getLogger(SplunkSchema.class); + private static final Logger logger = LoggerFactory.getLogger(SplunkSchema.class); private static final String SPL_TABLE_NAME = "spl"; private final Map<String, DynamicDrillTable> activeTables = new HashMap<>(); private final SplunkStoragePlugin plugin; private final String queryUserName; + private final Cache<String, Set<String>> cache; + private final boolean useCache; public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) { super(Collections.emptyList(), plugin.getName()); this.plugin = plugin; this.queryUserName = queryUserName; - + this.useCache = plugin.getConfig().getCacheExpiration() >= 0; + + if (useCache) { + logger.info("Using splunk schema cache for {}", plugin.getName()); + this.cache = Caffeine.newBuilder() + .expireAfterAccess(plugin.getConfig().getCacheExpiration(), TimeUnit.MINUTES) + .maximumSize(plugin.getConfig().getMaxCacheSize()) + .build(); + } else { + this.cache = null; + } registerIndexes(); } @@ -86,14 +101,19 @@ public class SplunkSchema extends AbstractSchema { } @Override - public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, - StorageStrategy strategy) { + public CreateTableEntry createNewTable(String tableName, + List<String> partitionColumns, + StorageStrategy strategy) { if (plugin.getConfig().isWritable() == null || (! plugin.getConfig().isWritable())) { throw UserException .dataWriteError() .message(plugin.getName() + " is not writable.") .build(logger); } + // Clear the index cache. + if (useCache) { + cache.invalidate(getNameForCache()); + } return new CreateTableEntry() { @Override @@ -122,6 +142,13 @@ public class SplunkSchema extends AbstractSchema { // Drop the index indexes.remove(indexName); + + if (useCache) { + // Update the cache + String cacheKey = getNameForCache(); + cache.invalidate(cacheKey); + cache.put(cacheKey, indexes.keySet()); + } } @Override @@ -139,6 +166,14 @@ public class SplunkSchema extends AbstractSchema { return SplunkPluginConfig.NAME; } + /** + * Returns the name for the cache. + * @return A String containing a combination of the queryUsername and sourceType (table name) + */ + private String getNameForCache() { + return queryUserName + "-" + plugin.getName(); + } + private void registerIndexes() { // Verify that the connection is successful. If not, don't register any indexes, // and throw an exception. @@ -148,8 +183,24 @@ public class SplunkSchema extends AbstractSchema { registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin, plugin.getName(), new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME, plugin.getConfig(), queryUserName))); + Set<String> indexList = null; // Retrieve and add all other Splunk indexes - for (String indexName : connection.getIndexes().keySet()) { + // First check the cache to see if we have a list of indexes. + String nameKey = getNameForCache(); + if (useCache) { + indexList = cache.getIfPresent(nameKey); + } + + // If the index list is not in the cache, query Splunk, retrieve the index list and add it to the cache. + if (indexList == null) { + logger.debug("Index list not in Splunk schema cache. Retrieving from Splunk."); + indexList = connection.getIndexes().keySet(); + if (useCache) { + cache.put(nameKey, indexList); + } + } + + for (String indexName : indexList) { logger.debug("Registering {}", indexName); registerTable(indexName, new DynamicDrillTable(plugin, plugin.getName(), new SplunkScanSpec(plugin.getName(), indexName, plugin.getConfig(), queryUserName))); diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java index 951e05bb17..37c9fd2d54 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.collect.ImmutableSet; import org.apache.drill.common.PlanStringBuilder; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.AbstractBase; @@ -29,7 +30,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.store.base.filter.ExprNode; -import com.google.common.collect.ImmutableSet; import java.util.Iterator; import java.util.List; @@ -38,7 +38,6 @@ import java.util.Objects; @JsonTypeName("splunk-sub-scan") public class SplunkSubScan extends AbstractBase implements SubScan { - private final SplunkPluginConfig config; private final SplunkScanSpec splunkScanSpec; private final List<SchemaPath> columns; diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java index 4b856182f5..e0471436fc 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkUtils.java @@ -19,6 +19,9 @@ package org.apache.drill.exec.store.splunk; public class SplunkUtils { + public static final String EARLIEST_TIME_COLUMN = "earliestTime"; + public static final String LATEST_TIME_COLUMN = "latestTime"; + /** * These are special fields that alter the queries sent to Splunk. */ diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java index f4884ece45..5636249433 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java @@ -59,7 +59,7 @@ public class SplunkConnectionTest extends SplunkBaseTest { SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(), null, SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(), - StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null + StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null, null, null, null ); SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null); sc.connect(); @@ -73,11 +73,14 @@ public class SplunkConnectionTest extends SplunkBaseTest { public void testGetIndexes() { SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null); EntityCollection<Index> indexes = sc.getIndexes(); - assertEquals(10, indexes.size()); + assertEquals(13, indexes.size()); List<String> expectedIndexNames = new ArrayList<>(); expectedIndexNames.add("_audit"); expectedIndexNames.add("_configtracker"); + expectedIndexNames.add("_dsappevent"); + expectedIndexNames.add("_dsclient"); + expectedIndexNames.add("_dsphonehome"); expectedIndexNames.add("_internal"); expectedIndexNames.add("_introspection"); expectedIndexNames.add("_telemetry"); @@ -92,7 +95,7 @@ public class SplunkConnectionTest extends SplunkBaseTest { indexNames.add(index.getName()); } - assertEquals(indexNames, expectedIndexNames); + assertEquals(expectedIndexNames, indexNames); } } diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java index fcdce8d778..470f3960cb 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkIndexesTest.java @@ -43,14 +43,17 @@ public class SplunkIndexesTest extends SplunkBaseTest { RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) .addRow("splunk", "summary") .addRow("splunk", "splunklogger") + .addRow("splunk", "_dsclient") .addRow("splunk", "_configtracker") .addRow("splunk", "_thefishbucket") - .addRow("splunk", "_audit") - .addRow("splunk", "_internal") - .addRow("splunk", "_introspection") .addRow("splunk", "main") .addRow("splunk", "history") + .addRow("splunk", "_dsphonehome") .addRow("splunk", "spl") + .addRow("splunk", "_audit") + .addRow("splunk", "_internal") + .addRow("splunk", "_dsappevent") + .addRow("splunk", "_introspection") .addRow("splunk", "_telemetry") .build(); diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java index 813c665239..f5200b98c8 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkLimitPushDownTest.java @@ -64,6 +64,7 @@ public class SplunkLimitPushDownTest extends SplunkBaseTest { .sql(sql) .planMatcher() .include("Limit", "maxRecords=4") + .include("spl", "search index=_audit rating=52.17 | fields rating | head 5 | table rating") .match(); } } diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java index 2109de28de..ebbff43b5b 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java @@ -77,14 +77,17 @@ public class SplunkPluginTest extends SplunkBaseTest { RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) .addRow("splunk", "summary") .addRow("splunk", "splunklogger") + .addRow("splunk", "_dsclient") .addRow("splunk", "_configtracker") .addRow("splunk", "_thefishbucket") - .addRow("splunk", "_audit") - .addRow("splunk", "_internal") - .addRow("splunk", "_introspection") .addRow("splunk", "main") .addRow("splunk", "history") + .addRow("splunk", "_dsphonehome") .addRow("splunk", "spl") + .addRow("splunk", "_audit") + .addRow("splunk", "_internal") + .addRow("splunk", "_dsappevent") + .addRow("splunk", "_introspection") .addRow("splunk", "_telemetry") .build(); diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java index 5cd228d70e..dc434c8f06 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java @@ -70,7 +70,7 @@ public class SplunkTestSuite extends ClusterTest { private static AtomicInteger initCount = new AtomicInteger(0); @ClassRule public static GenericContainer<?> splunk = new GenericContainer<>( - DockerImageName.parse("splunk/splunk:9.0.2") + DockerImageName.parse("splunk/splunk:9.3") ) .withExposedPorts(8089, 8089) .withEnv("SPLUNK_START_ARGS", "--accept-license") @@ -88,6 +88,17 @@ public class SplunkTestSuite extends ClusterTest { startCluster(builder); splunk.start(); + splunk.execInContainer("if ! sudo grep -q 'minFileSize' /opt/splunk/etc/system/local/server.conf; then " + + "sudo chmod a+w /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"# disk usage processor settings\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"[diskUsage]\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"minFreeSpace = 2000\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"pollingFrequency = 100000\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo echo \"pollingTimerFrequency = 10\" >> /opt/splunk/etc/system/local/server.conf; " + + "sudo chmod 600 /opt/splunk/etc/system/local/server.conf; " + + "sudo /opt/splunk/bin/splunk restart; " + + "fi"); + String hostname = splunk.getHost(); Integer port = splunk.getFirstMappedPort(); StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); @@ -98,7 +109,7 @@ public class SplunkTestSuite extends ClusterTest { "1", "now", null, 4, - StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null + StoragePluginConfig.AuthMode.SHARED_USER.name(), true, null, null, null, null ); SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true); pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG); @@ -120,7 +131,7 @@ public class SplunkTestSuite extends ClusterTest { "1", "now", credentialsProvider, 4, - AuthMode.USER_TRANSLATION.name(), true, null + AuthMode.USER_TRANSLATION.name(), true, null, null, null, null ); SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true); pluginRegistry.put("ut_splunk", SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION); diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java index f5d399cd23..239a7930db 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkWriterTest.java @@ -76,10 +76,10 @@ public class SplunkWriterTest extends SplunkBaseTest { .buildSchema(); RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema) - .addRow("198.35.2.120", "ACCESSORIES") + .addRow("198.35.2.120", "STRATEGY") .addRow("198.35.2.120", null) .addRow("198.35.2.120", null) - .addRow("198.35.2.120", "STRATEGY") + .addRow("198.35.2.120", "ACCESSORIES") .addRow("198.35.2.120", "NULL") .build(); RowSetUtilities.verify(expected, results); diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml index 15a337121f..fe28d07bec 100644 --- a/metastore/iceberg-metastore/pom.xml +++ b/metastore/iceberg-metastore/pom.xml @@ -30,10 +30,6 @@ <artifactId>drill-iceberg-metastore</artifactId> <name>Drill : Metastore : Iceberg</name> - <properties> - <caffeine.version>2.7.0</caffeine.version> - </properties> - <dependencies> <dependency> <groupId>org.apache.drill</groupId> diff --git a/pom.xml b/pom.xml index d7ef0d0925..1b5693b34e 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ <avatica.version>1.23.0</avatica.version> <avro.version>1.11.4</avro.version> <bouncycastle.version>1.78.1</bouncycastle.version> + <caffeine.version>2.9.3</caffeine.version> <calcite.groupId>org.apache.calcite</calcite.groupId> <calcite.version>1.34.0</calcite.version> <codemodel.version>2.6</codemodel.version>