This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 231f514c91 DRILL-8276: Add Support for User Translation for Splunk
Plugin (#2618)
231f514c91 is described below
commit 231f514c9117a574b00a3080a834fba0478cde9f
Author: Charles S. Givre <[email protected]>
AuthorDate: Mon Aug 15 13:32:38 2022 -0700
DRILL-8276: Add Support for User Translation for Splunk Plugin (#2618)
---
.../drill/exec/store/splunk/SplunkBatchReader.java | 2 +-
.../drill/exec/store/splunk/SplunkConnection.java | 24 +++--
.../drill/exec/store/splunk/SplunkGroupScan.java | 35 +++++++-
.../exec/store/splunk/SplunkPluginConfig.java | 33 +++++--
.../drill/exec/store/splunk/SplunkScanSpec.java | 11 ++-
.../exec/store/splunk/SplunkSchemaFactory.java | 37 +++++---
.../exec/store/splunk/SplunkStoragePlugin.java | 50 ++++++++++-
.../drill/exec/store/splunk/SplunkSubScan.java | 2 +-
.../exec/store/splunk/SplunkConnectionTest.java | 6 +-
.../drill/exec/store/splunk/SplunkPluginTest.java | 2 +-
.../drill/exec/store/splunk/SplunkTestSuite.java | 34 ++++++-
.../store/splunk/TestSplunkUserTranslation.java | 100 +++++++++++++++++++++
pom.xml | 2 +-
13 files changed, 301 insertions(+), 37 deletions(-)
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
index 9932801936..cba9e3d7a7 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java
@@ -81,7 +81,7 @@ public class SplunkBatchReader implements
ManagedReader<SchemaNegotiator> {
this.subScan = subScan;
this.projectedColumns = subScan.getColumns();
this.subScanSpec = subScan.getScanSpec();
- SplunkConnection connection = new SplunkConnection(config);
+ SplunkConnection connection = new SplunkConnection(config,
subScan.getUserName());
this.splunkService = connection.connect();
this.csvSettings = new CsvParserSettings();
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
index c92738eaf6..cac75b8d9f 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java
@@ -26,6 +26,7 @@ import com.splunk.SSLSecurityProtocol;
import com.splunk.Service;
import com.splunk.ServiceArgs;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,12 +44,18 @@ public class SplunkConnection {
private final Optional<UsernamePasswordCredentials> credentials;
private final String hostname;
private final int port;
+ private final String queryUserName;
private Service service;
private int connectionAttempts;
- public SplunkConnection(SplunkPluginConfig config) {
- this.credentials = config.getUsernamePasswordCredentials();
+ public SplunkConnection(SplunkPluginConfig config, String queryUserName) {
+ if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+ this.credentials = config.getUsernamePasswordCredentials(queryUserName);
+ } else {
+ this.credentials = config.getUsernamePasswordCredentials();
+ }
this.hostname = config.getHostname();
+ this.queryUserName = queryUserName;
this.port = config.getPort();
this.connectionAttempts = config.getReconnectRetries();
service = connect();
@@ -58,10 +65,15 @@ public class SplunkConnection {
/**
* This constructor is used for testing only
*/
- public SplunkConnection(SplunkPluginConfig config, Service service) {
- this.credentials = config.getUsernamePasswordCredentials();
+ public SplunkConnection(SplunkPluginConfig config, Service service, String
queryUserName) {
+ if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+ this.credentials = config.getUsernamePasswordCredentials(queryUserName);
+ } else {
+ this.credentials = config.getUsernamePasswordCredentials();
+ }
this.hostname = config.getHostname();
this.port = config.getPort();
+ this.queryUserName = queryUserName;
this.service = service;
}
@@ -80,11 +92,11 @@ public class SplunkConnection {
connectionAttempts--;
service = Service.connect(loginArgs);
} catch (Exception e) {
- if(connectionAttempts > 0) {
+ if (connectionAttempts > 0) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException interruptedException) {
- logger.error("Unable to wait 2 secs before next connection trey to
Splunk");
+ logger.error("Unable to wait 2 secs before next connection try to
Splunk");
}
return connect();
}
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
index 69ed312183..47ad693377 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkGroupScan.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -30,10 +31,13 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.base.filter.ExprNode;
import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -50,17 +54,20 @@ public class SplunkGroupScan extends AbstractGroupScan {
private int hashCode;
+ private MetadataProviderManager metadataProviderManager;
+
/**
* Creates a new group scan from the storage plugin.
*/
- public SplunkGroupScan (SplunkScanSpec scanSpec) {
- super("no-user");
+ public SplunkGroupScan (SplunkScanSpec scanSpec, MetadataProviderManager
metadataProviderManager) {
+ super(scanSpec.queryUserName());
this.splunkScanSpec = scanSpec;
this.config = scanSpec.getConfig();
this.columns = ALL_COLUMNS;
this.filters = null;
this.filterSelectivity = 0.0;
this.maxRecords = -1;
+ this.metadataProviderManager = metadataProviderManager;
this.scanStats = computeScanStats();
}
@@ -76,10 +83,12 @@ public class SplunkGroupScan extends AbstractGroupScan {
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
this.scanStats = that.scanStats;
+ this.hashCode = that.hashCode;
}
/**
@@ -97,8 +106,8 @@ public class SplunkGroupScan extends AbstractGroupScan {
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.maxRecords = that.maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
-
}
/**
@@ -115,6 +124,7 @@ public class SplunkGroupScan extends AbstractGroupScan {
this.filters = filters;
this.filterSelectivity = filterSelectivity;
this.maxRecords = that.maxRecords;
+ this.metadataProviderManager = that.metadataProviderManager;
this.scanStats = computeScanStats();
}
@@ -208,6 +218,25 @@ public class SplunkGroupScan extends AbstractGroupScan {
return new SplunkGroupScan(this, maxRecords);
}
+ public TupleMetadata getSchema() {
+ if (metadataProviderManager == null) {
+ return null;
+ }
+ try {
+ return metadataProviderManager.getSchemaProvider().read().getSchema();
+ } catch (IOException | NullPointerException e) {
+ return null;
+ }
+ }
+
+ @Override
+ public TableMetadataProvider getMetadataProvider() {
+ if (metadataProviderManager == null) {
+ return null;
+ }
+ return metadataProviderManager.getTableMetadataProvider();
+ }
+
@Override
public String getDigest() {
return toString();
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
index 41d134a3f8..c6e574f0b2 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java
@@ -28,6 +28,8 @@ import
org.apache.drill.common.logical.security.CredentialsProvider;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.exec.store.security.CredentialProviderUtils;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Optional;
@@ -35,13 +37,14 @@ import java.util.Optional;
@JsonTypeName(SplunkPluginConfig.NAME)
public class SplunkPluginConfig extends StoragePluginConfig {
+ private static final Logger logger =
LoggerFactory.getLogger(SplunkPluginConfig.class);
+
public static final String NAME = "splunk";
public static final int DISABLED_RECONNECT_RETRIES = 1;
private final String hostname;
private final String earliestTime;
private final String latestTime;
-
private final int port;
private final Integer reconnectRetries;
@@ -56,7 +59,7 @@ public class SplunkPluginConfig extends StoragePluginConfig {
@JsonProperty("reconnectRetries") Integer
reconnectRetries,
@JsonProperty("authMode") String authMode) {
super(CredentialProviderUtils.getCredentialsProvider(username, password,
credentialsProvider),
- credentialsProvider == null);
+ credentialsProvider == null, AuthMode.parseOrDefault(authMode,
AuthMode.SHARED_USER));
this.hostname = hostname;
this.port = port;
this.earliestTime = earliestTime;
@@ -73,6 +76,10 @@ public class SplunkPluginConfig extends StoragePluginConfig {
this.reconnectRetries = that.reconnectRetries;
}
+ /**
+ * Gets the credentials. This method is used when user translation is not
enabled.
+ * @return An {@link Optional} containing {@link
UsernamePasswordCredentials} from the config.
+ */
@JsonIgnore
public Optional<UsernamePasswordCredentials>
getUsernamePasswordCredentials() {
return new UsernamePasswordCredentials.Builder()
@@ -80,12 +87,24 @@ public class SplunkPluginConfig extends StoragePluginConfig
{
.build();
}
+ /**
+ * Gets the credentials. This method is used when user translation is
enabled.
+ * @return An {@link Optional} containing {@link
UsernamePasswordCredentials} from the config.
+ */
+ @JsonIgnore
+ public Optional<UsernamePasswordCredentials>
getUsernamePasswordCredentials(String username) {
+ return new UsernamePasswordCredentials.Builder()
+ .setCredentialsProvider(credentialsProvider)
+ .setQueryUser(username)
+ .build();
+ }
+
@JsonProperty("username")
public String getUsername() {
if (!directCredentials) {
return null;
}
- return getUsernamePasswordCredentials()
+ return getUsernamePasswordCredentials(null)
.map(UsernamePasswordCredentials::getUsername)
.orElse(null);
}
@@ -95,7 +114,7 @@ public class SplunkPluginConfig extends StoragePluginConfig {
if (!directCredentials) {
return null;
}
- return getUsernamePasswordCredentials()
+ return getUsernamePasswordCredentials(null)
.map(UsernamePasswordCredentials::getPassword)
.orElse(null);
}
@@ -141,12 +160,13 @@ public class SplunkPluginConfig extends
StoragePluginConfig {
Objects.equals(hostname, thatConfig.hostname) &&
Objects.equals(port, thatConfig.port) &&
Objects.equals(earliestTime, thatConfig.earliestTime) &&
- Objects.equals(latestTime, thatConfig.latestTime);
+ Objects.equals(latestTime, thatConfig.latestTime) &&
+ Objects.equals(authMode, thatConfig.authMode);
}
@Override
public int hashCode() {
- return Objects.hash(credentialsProvider, hostname, port, earliestTime,
latestTime);
+ return Objects.hash(credentialsProvider, hostname, port, earliestTime,
latestTime, authMode);
}
@Override
@@ -157,6 +177,7 @@ public class SplunkPluginConfig extends StoragePluginConfig
{
.field("port", port)
.field("earliestTime", earliestTime)
.field("latestTime", latestTime)
+ .field("Authentication Mode", authMode)
.toString();
}
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
index 2d736bbd4b..bd4a774161 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkScanSpec.java
@@ -29,14 +29,17 @@ public class SplunkScanSpec implements DrillTableSelection {
private final String pluginName;
private final String indexName;
private final SplunkPluginConfig config;
+ private final String queryUserName;
@JsonCreator
public SplunkScanSpec(@JsonProperty("pluginName") String pluginName,
@JsonProperty("indexName") String indexName,
- @JsonProperty("config") SplunkPluginConfig config) {
+ @JsonProperty("config") SplunkPluginConfig config,
+ @JsonProperty("queryUserName") String queryUserName) {
this.pluginName = pluginName;
this.indexName = indexName;
this.config = config;
+ this.queryUserName = queryUserName;
}
@JsonProperty("pluginName")
@@ -48,12 +51,18 @@ public class SplunkScanSpec implements DrillTableSelection {
@JsonProperty("config")
public SplunkPluginConfig getConfig() { return config; }
+ @JsonProperty("queryUserName")
+ public String queryUserName() {
+ return queryUserName;
+ }
+
@Override
public String toString() {
return new PlanStringBuilder(this)
.field("config", config)
.field("schemaName", pluginName)
.field("indexName", indexName)
+ .field("queryUserName", queryUserName)
.toString();
}
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
index bdd71f903e..8270c45d58 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSchemaFactory.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.splunk;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.AbstractSchemaFactory;
@@ -38,6 +39,7 @@ public class SplunkSchemaFactory extends
AbstractSchemaFactory {
private static final Logger logger =
LoggerFactory.getLogger(SplunkSchemaFactory.class);
private static final String SPL_TABLE_NAME = "spl";
private final SplunkStoragePlugin plugin;
+ private String queryUserName;
public SplunkSchemaFactory(SplunkStoragePlugin plugin) {
super(plugin.getName());
@@ -46,18 +48,23 @@ public class SplunkSchemaFactory extends
AbstractSchemaFactory {
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
- SplunkSchema schema = new SplunkSchema(plugin);
+ this.queryUserName = schemaConfig.getUserName();
+ SplunkSchema schema = new SplunkSchema(plugin, queryUserName);
SchemaPlus plusOfThis = parent.add(schema.getName(), schema);
}
- class SplunkSchema extends AbstractSchema {
+ static class SplunkSchema extends AbstractSchema {
private final Map<String, DynamicDrillTable> activeTables = new
HashMap<>();
private final SplunkStoragePlugin plugin;
+ private final String queryUserName;
- public SplunkSchema(SplunkStoragePlugin plugin) {
+ public SplunkSchema(SplunkStoragePlugin plugin, String queryUserName) {
super(Collections.emptyList(), plugin.getName());
this.plugin = plugin;
+ this.queryUserName = queryUserName;
+
+
registerIndexes();
}
@@ -70,7 +77,7 @@ public class SplunkSchemaFactory extends
AbstractSchemaFactory {
} else {
// Register the table
return registerTable(name, new DynamicDrillTable(plugin,
plugin.getName(),
- new SplunkScanSpec(plugin.getName(), name, plugin.getConfig())));
+ new SplunkScanSpec(plugin.getName(), name, plugin.getConfig(),
queryUserName)));
}
}
@@ -95,19 +102,29 @@ public class SplunkSchemaFactory extends
AbstractSchemaFactory {
}
private void registerIndexes() {
+ // Verify that the connection is successful. If not, don't register any
indexes,
+ // and throw an exception.
+ SplunkPluginConfig config = plugin.getConfig();
+ SplunkConnection connection;
+ try {
+ connection = new SplunkConnection(config, queryUserName);
+ connection.connect();
+ } catch (Exception e) {
+ // Catch any connection errors that may happen.
+ throw UserException.connectionError()
+ .message("Unable to connect to Splunk: " + plugin.getName() + " " +
e.getMessage())
+ .build(logger);
+ }
+
// Add default "spl" table to index list.
registerTable(SPL_TABLE_NAME, new DynamicDrillTable(plugin,
plugin.getName(),
- new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME,
plugin.getConfig())));
+ new SplunkScanSpec(plugin.getName(), SPL_TABLE_NAME,
plugin.getConfig(), queryUserName)));
// Retrieve and add all other Splunk indexes
- SplunkPluginConfig config = plugin.getConfig();
- SplunkConnection connection = new SplunkConnection(config);
- connection.connect();
-
for (String indexName : connection.getIndexes().keySet()) {
logger.debug("Registering {}", indexName);
registerTable(indexName, new DynamicDrillTable(plugin,
plugin.getName(),
- new SplunkScanSpec(plugin.getName(), indexName, config)));
+ new SplunkScanSpec(plugin.getName(), indexName, config,
queryUserName)));
}
}
}
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
index dc011a63f6..f04e25dc1a 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkStoragePlugin.java
@@ -22,20 +22,30 @@ import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.base.filter.FilterPushDownUtils;
+import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
import java.util.Set;
public class SplunkStoragePlugin extends AbstractStoragePlugin {
+ private static final Logger logger =
LoggerFactory.getLogger(SplunkStoragePlugin.class);
private final SplunkPluginConfig config;
private final SplunkSchemaFactory schemaFactory;
@@ -57,13 +67,51 @@ public class SplunkStoragePlugin extends
AbstractStoragePlugin {
@Override
public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+ // Check to see if user translation is enabled. If so, and creds are
+ // not present, then do not register any schemata. This prevents
+ // info schema errors.
+ if (config.getAuthMode() == AuthMode.USER_TRANSLATION) {
+ Optional<UsernamePasswordCredentials> userCreds =
config.getUsernamePasswordCredentials(schemaConfig.getUserName());
+ if (! userCreds.isPresent()) {
+ logger.debug(
+ "No schemas will be registered in {} for query user {}.",
+ getName(), schemaConfig.getUserName()
+ );
+ return;
+ }
+ }
schemaFactory.registerSchemas(schemaConfig, parent);
}
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions
selection,
+ SessionOptionManager options)
throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions
selection,
+ SessionOptionManager options,
MetadataProviderManager metadataProviderManager) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
+ options, metadataProviderManager);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions
selection,
+ List<SchemaPath> columns) throws
IOException {
+ return getPhysicalScan(userName, selection, columns, null, null);
+ }
@Override
public AbstractGroupScan getPhysicalScan(String userName, JSONOptions
selection) throws IOException {
+ return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS,
null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions
selection, List<SchemaPath> columns, SessionOptionManager options,
+ MetadataProviderManager
metadataProviderManager) throws IOException {
SplunkScanSpec scanSpec =
selection.getListWith(context.getLpPersistence().getMapper(), new
TypeReference<SplunkScanSpec>() {});
- return new SplunkGroupScan(scanSpec);
+ return new SplunkGroupScan(scanSpec, metadataProviderManager);
}
@Override
diff --git
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
index 21b1237420..e90bd2aa9d 100644
---
a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
+++
b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkSubScan.java
@@ -52,7 +52,7 @@ public class SplunkSubScan extends AbstractBase implements
SubScan {
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("filters") Map<String, ExprNode.ColRelOpConstNode> filters,
@JsonProperty("maxRecords") int maxRecords) {
- super("user");
+ super(splunkScanSpec.queryUserName());
this.config = config;
this.splunkScanSpec = splunkScanSpec;
this.columns = columns;
diff --git
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
index 4e010b2201..22cc0a5942 100644
---
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
+++
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java
@@ -36,7 +36,7 @@ public class SplunkConnectionTest extends SplunkBaseTest {
@Test
public void testConnection() {
- SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+ SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG,
null);
sc.connect();
}
@@ -54,7 +54,7 @@ public class SplunkConnectionTest extends SplunkBaseTest {
SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries(),
StoragePluginConfig.AuthMode.SHARED_USER.name()
);
- SplunkConnection sc = new SplunkConnection(invalidSplunkConfig);
+ SplunkConnection sc = new SplunkConnection(invalidSplunkConfig, null);
sc.connect();
fail();
} catch (UserException e) {
@@ -64,7 +64,7 @@ public class SplunkConnectionTest extends SplunkBaseTest {
@Test
public void testGetIndexes() {
- SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG);
+ SplunkConnection sc = new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG,
null);
EntityCollection<Index> indexes = sc.getIndexes();
assertEquals(9, indexes.size());
diff --git
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
index 7d52008d06..2818018d79 100644
---
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
+++
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java
@@ -311,7 +311,7 @@ public class SplunkPluginTest extends SplunkBaseTest {
.thenThrow(new RuntimeException("Fail second connection to Splunk"))
.thenThrow(new RuntimeException("Fail third connection to Splunk"))
.thenReturn(new Service(loginArgs)); // fourth connection is
successful
- new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG); // it will fail, in
case "reconnectRetries": 1 is specified in configs
+ new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG, null); // it will
fail, in case "reconnectRetries": 1 is specified in configs
splunk.verify(
() -> Service.connect(loginArgs),
times(4)
diff --git
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
index 0abf92aa66..2c07ac4c88 100644
---
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
+++
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java
@@ -20,8 +20,11 @@ package org.apache.drill.exec.store.splunk;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -34,8 +37,12 @@ import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_2;
+
@RunWith(Suite.class)
@Suite.SuiteClasses({
@@ -44,7 +51,8 @@ import java.util.concurrent.atomic.AtomicInteger;
SplunkLimitPushDownTest.class,
SplunkIndexesTest.class,
SplunkPluginTest.class,
- SplunkTestSplunkUtils.class
+ SplunkTestSplunkUtils.class,
+ TestSplunkUserTranslation.class
})
@Category({SlowTest.class})
@@ -52,6 +60,8 @@ public class SplunkTestSuite extends ClusterTest {
private static final Logger logger =
LoggerFactory.getLogger(SplunkTestSuite.class);
protected static SplunkPluginConfig SPLUNK_STORAGE_PLUGIN_CONFIG = null;
+
+ protected static SplunkPluginConfig
SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = null;
public static final String SPLUNK_LOGIN = "admin";
public static final String SPLUNK_PASS = "password";
@@ -67,7 +77,12 @@ public class SplunkTestSuite extends ClusterTest {
public static void initSplunk() throws Exception {
synchronized (SplunkTestSuite.class) {
if (initCount.get() == 0) {
- startCluster(ClusterFixture.builder(dirTestWatcher));
+ ClusterFixtureBuilder builder = new
ClusterFixtureBuilder(dirTestWatcher)
+ .configProperty(ExecConstants.HTTP_ENABLE, true)
+ .configProperty(ExecConstants.HTTP_PORT_HUNT, true)
+ .configProperty(ExecConstants.IMPERSONATION_ENABLED, true);
+ startCluster(builder);
+
splunk.start();
String hostname = splunk.getHost();
Integer port = splunk.getFirstMappedPort();
@@ -79,6 +94,19 @@ public class SplunkTestSuite extends ClusterTest {
runningSuite = true;
logger.info("Take a time to ready more Splunk events (10 sec)...");
Thread.sleep(10000);
+
+
+ PlainCredentialsProvider credentialsProvider = new
PlainCredentialsProvider(new HashMap<>());
+ // Add authorized user
+ credentialsProvider.setUserCredentials(SPLUNK_LOGIN, SPLUNK_PASS,
TEST_USER_1);
+ // Add unauthorized user
+ credentialsProvider.setUserCredentials("nope", "no way dude",
TEST_USER_2);
+
+ SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION = new
SplunkPluginConfig(null, null, hostname, port, "1", "now",
+ credentialsProvider, 4, AuthMode.USER_TRANSLATION.name());
+ SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION.setEnabled(true);
+ pluginRegistry.put("ut_splunk",
SPLUNK_STORAGE_PLUGIN_CONFIG_WITH_USER_TRANSLATION);
+
}
initCount.incrementAndGet();
runningSuite = true;
diff --git
a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java
new file mode 100644
index 0000000000..7e24e0c720
--- /dev/null
+++
b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/TestSplunkUserTranslation.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+import static
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER;
+import static
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.ADMIN_USER_PASSWORD;
+import static
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1;
+import static
org.apache.drill.exec.rpc.user.security.testing.UserAuthenticatorTestImpl.TEST_USER_1_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Category({SlowTest.class})
+public class TestSplunkUserTranslation extends SplunkBaseTest {
+
+ @Test
+ public void testInfoSchemaQueryWithMissingCredentials() throws Exception {
+ // This test validates that the correct credentials are sent down to
Splunk.
+ // This user should not see the ut_splunk because they do not have valid
credentials
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, ADMIN_USER)
+ .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+ .build();
+
+ String sql = "SHOW DATABASES WHERE schema_name LIKE '%splunk%'";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ }
+
+ @Test
+ public void testInfoSchemaQueryWithValidCredentials() throws Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, TEST_USER_1)
+ .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+ .build();
+
+ String sql = "SHOW DATABASES WHERE schema_name LIKE '%splunk'";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(2, results.rowCount());
+ }
+
+ @Test
+ public void testSplunkQueryWithUserTranslation() throws Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, TEST_USER_1)
+ .property(DrillProperties.PASSWORD, TEST_USER_1_PASSWORD)
+ .build();
+
+ String sql = "SELECT acceleration_id, action, add_offset, add_timestamp
FROM ut_splunk._audit LIMIT 2";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(2, results.rowCount());
+ }
+
+ @Test
+ public void testSplunkQueryWithUserTranslationAndInvalidCredentials() throws
Exception {
+ ClientFixture client = cluster
+ .clientBuilder()
+ .property(DrillProperties.USER, ADMIN_USER)
+ .property(DrillProperties.PASSWORD, ADMIN_USER_PASSWORD)
+ .build();
+
+ String sql = "SELECT acceleration_id, action, add_offset, add_timestamp
FROM ut_splunk._audit LIMIT 2";
+ try {
+ client.queryBuilder().sql(sql).rowSet();
+ fail();
+ } catch (UserRemoteException e) {
+ assertTrue(e.getMessage().contains("Schema [[ut_splunk]] is not valid"));
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 16193cb2a4..9786377d59 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,7 +121,7 @@
<commons.cli.version>1.4</commons.cli.version>
<snakeyaml.version>1.26</snakeyaml.version>
<commons.lang3.version>3.10</commons.lang3.version>
- <testcontainers.version>1.16.3</testcontainers.version>
+ <testcontainers.version>1.17.3</testcontainers.version>
<typesafe.config.version>1.4.2</typesafe.config.version>
<commons.codec.version>1.14</commons.codec.version>
<xerces.version>2.12.2</xerces.version>