This is an automated email from the ASF dual-hosted git repository. vitalii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 958d849 DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently (#2278) 958d849 is described below commit 958d849144a662a781e4d7d59adbf3300ad3bdea Author: Vitalii Diravka <vita...@apache.org> AuthorDate: Tue Jul 27 16:21:04 2021 +0300 DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently (#2278) * DRILL-7975: Connection to Splunk Drill Storage Plugin fails intermittently * Changes according to review * Removing reconnectRetries from all plugins. Removing deserializing splunk json config in tests. * Revert CI Direct Memory property: 3200Mb -> 2500Mb --- .../exec/store/druid/DruidStoragePluginConfig.java | 1 - .../apache/drill/store/kudu/TestKuduConnect.java | 2 +- contrib/storage-splunk/README.md | 11 +++++---- .../drill/exec/store/splunk/SplunkConnection.java | 14 ++++++++++- .../exec/store/splunk/SplunkPluginConfig.java | 10 +++++++- .../main/resources/bootstrap-storage-plugins.json | 3 ++- .../exec/store/splunk/SplunkConnectionTest.java | 3 ++- .../drill/exec/store/splunk/SplunkPluginTest.java | 27 ++++++++++++++++++++++ .../drill/exec/store/splunk/SplunkTestSuite.java | 3 ++- .../planner/sql/handlers/ShowTablesHandler.java | 2 +- .../drill/exec/store/ischema/InfoSchemaConfig.java | 1 - .../AbstractSecuredStoragePluginConfig.java | 9 ++++---- pom.xml | 10 ++++++-- 13 files changed, 76 insertions(+), 20 deletions(-) diff --git a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java index 6bf13a1..3835501 100644 --- a/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java +++ b/contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java @@ -42,7 +42,6 @@ public class DruidStoragePluginConfig extends StoragePluginConfig { @JsonProperty("brokerAddress") String brokerAddress, @JsonProperty("coordinatorAddress") String coordinatorAddress, @JsonProperty("averageRowSizeBytes") Integer averageRowSizeBytes) { - this.brokerAddress = brokerAddress; this.coordinatorAddress = coordinatorAddress; this.averageRowSizeBytes = diff --git a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java index f75e983..3f97d3d 100644 --- a/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java +++ b/contrib/storage-kudu/src/test/java/org/apache/drill/store/kudu/TestKuduConnect.java @@ -40,7 +40,7 @@ import org.apache.kudu.client.RowResultIterator; import org.apache.kudu.client.SessionConfiguration; import org.junit.experimental.categories.Category; -@Ignore("requires remote kudu server") +@Ignore("requires remote kudu server") // TODO: can be rewritten by leveraging kudu docker container: DRILL-7977 @Category(KuduStorageTest.class) public class TestKuduConnect extends BaseTest { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestKuduConnect.class); diff --git a/contrib/storage-splunk/README.md b/contrib/storage-splunk/README.md index a8fd0e7..f28c4b8 100644 --- a/contrib/storage-splunk/README.md +++ b/contrib/storage-splunk/README.md @@ -3,21 +3,24 @@ This plugin enables Drill to query Splunk. ## Configuration To connect Drill to Splunk, create a new storage plugin with the following configuration: - -To successfully connect, Splunk uses port `8089` for interfaces. This port must be open for Drill to query Splunk. - ```json { "type":"splunk", + "enabled": false, "username": "admin", "password": "changeme", "hostname": "localhost", "port": 8089, "earliestTime": "-14d", "latestTime": "now", - "enabled": false + "reconnectRetries": 3 } ``` +To successfully connect, Splunk uses port `8089` for interfaces. This port must be open for Drill to query Splunk. + +Sometimes Splunk has issue in connection to it: +https://github.com/splunk/splunk-sdk-java/issues/62 <br> +To bypass it by Drill please specify "reconnectRetries": 3. It allows you to retry the connection several times. ## Understanding Splunk's Data Model Splunk's primary use case is analyzing event logs with a timestamp. As such, data is indexed by the timestamp, with the most recent data being indexed first. By default, Splunk diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java index 05988c4..b262d7d 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkConnection.java @@ -30,6 +30,8 @@ import org.apache.drill.exec.store.security.UsernamePasswordCredentials; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; + /** * This class wraps the functionality of the Splunk connection for Drill. */ @@ -41,11 +43,13 @@ public class SplunkConnection { private final String hostname; private final int port; private Service service; + private int connectionAttempts; public SplunkConnection(SplunkPluginConfig config) { this.credentials = config.getUsernamePasswordCredentials(); this.hostname = config.getHostname(); this.port = config.getPort(); + this.connectionAttempts = config.getReconnectRetries(); service = connect(); ConfCollection confs = service.getConfs(); } @@ -71,10 +75,18 @@ public class SplunkConnection { loginArgs.setPort(port); loginArgs.setPassword(credentials.getPassword()); loginArgs.setUsername(credentials.getUsername()); - try { + connectionAttempts--; service = Service.connect(loginArgs); } catch (Exception e) { + if(connectionAttempts > 0) { + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException interruptedException) { + logger.error("Unable to wait 2 secs before next connection trey to Splunk"); + } + return connect(); + } throw UserException .connectionError() .message("Unable to connect to Splunk at %s:%s", hostname, port) diff --git a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java index c122a98..54c6564 100644 --- a/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java +++ b/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkPluginConfig.java @@ -34,12 +34,14 @@ import java.util.Objects; public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig { public static final String NAME = "splunk"; + public static final int DISABLED_RECONNECT_RETRIES = 1; private final String hostname; private final String earliestTime; private final String latestTime; private final int port; + private final Integer reconnectRetries; @JsonCreator public SplunkPluginConfig(@JsonProperty("username") String username, @@ -48,13 +50,15 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig { @JsonProperty("port") int port, @JsonProperty("earliestTime") String earliestTime, @JsonProperty("latestTime") String latestTime, - @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider) { + @JsonProperty("credentialsProvider") CredentialsProvider credentialsProvider, + @JsonProperty("reconnectRetries") Integer reconnectRetries) { super(CredentialProviderUtils.getCredentialsProvider(username, password, credentialsProvider), credentialsProvider == null); this.hostname = hostname; this.port = port; this.earliestTime = earliestTime; this.latestTime = latestTime == null ? "now" : latestTime; + this.reconnectRetries = reconnectRetries; } @JsonIgnore @@ -98,6 +102,10 @@ public class SplunkPluginConfig extends AbstractSecuredStoragePluginConfig { return latestTime; } + @JsonProperty("reconnectRetries") + public int getReconnectRetries() { + return reconnectRetries != null ? reconnectRetries : DISABLED_RECONNECT_RETRIES; + } @Override public boolean equals(Object that) { diff --git a/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json index 8a55547..db66657 100644 --- a/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-splunk/src/main/resources/bootstrap-storage-plugins.json @@ -2,13 +2,14 @@ "storage":{ "splunk" : { "type":"splunk", + "enabled": false, "username": "admin", "password": "changeme", "hostname": "localhost", "port": 8089, "earliestTime": "-14d", "latestTime": "now", - "enabled": false + "reconnectRetries": 2 } } } diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java index 67006d5..d497c6e 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkConnectionTest.java @@ -49,7 +49,8 @@ public class SplunkConnectionTest extends SplunkBaseTest { SPLUNK_STORAGE_PLUGIN_CONFIG.getPort(), SPLUNK_STORAGE_PLUGIN_CONFIG.getEarliestTime(), SPLUNK_STORAGE_PLUGIN_CONFIG.getLatestTime(), - null + null, + SPLUNK_STORAGE_PLUGIN_CONFIG.getReconnectRetries() ); SplunkConnection sc = new SplunkConnection(invalidSplunkConfig); sc.connect(); diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java index 254a96c..a9e7aff 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkPluginTest.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.store.splunk; +import com.splunk.Service; +import com.splunk.ServiceArgs; import org.apache.drill.categories.SlowTest; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos; @@ -30,10 +32,14 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runners.MethodSorters; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import static org.apache.drill.exec.store.splunk.SplunkTestSuite.SPLUNK_STORAGE_PLUGIN_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.times; @FixMethodOrder(MethodSorters.JVM) @Category({SlowTest.class}) @@ -290,4 +296,25 @@ public class SplunkPluginTest extends SplunkBaseTest { int cnt = queryBuilder().physical(plan).singletonInt(); assertEquals("Counts should match", 1, cnt); } + + @Test + public void testReconnectRetries() { + try (MockedStatic<Service> splunk = Mockito.mockStatic(Service.class)) { + ServiceArgs loginArgs = new ServiceArgs(); + loginArgs.setHost(SPLUNK_STORAGE_PLUGIN_CONFIG.getHostname()); + loginArgs.setPort(SPLUNK_STORAGE_PLUGIN_CONFIG.getPort()); + loginArgs.setPassword(SPLUNK_STORAGE_PLUGIN_CONFIG.getPassword()); + loginArgs.setUsername(SPLUNK_STORAGE_PLUGIN_CONFIG.getUsername()); + splunk.when(() -> Service.connect(loginArgs)) + .thenThrow(new RuntimeException("Fail first connection to Splunk")) + .thenThrow(new RuntimeException("Fail second connection to Splunk")) + .thenThrow(new RuntimeException("Fail third connection to Splunk")) + .thenReturn(new Service(loginArgs)); // fourth connection is successful + new SplunkConnection(SPLUNK_STORAGE_PLUGIN_CONFIG); // it will fail, in case "reconnectRetries": 1 is specified in configs + splunk.verify( + () -> Service.connect(loginArgs), + times(4) + ); + } + } } diff --git a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java index 8e24587..23082e3 100644 --- a/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java +++ b/contrib/storage-splunk/src/test/java/org/apache/drill/exec/store/splunk/SplunkTestSuite.java @@ -71,7 +71,8 @@ public class SplunkTestSuite extends ClusterTest { String hostname = splunk.getHost(); Integer port = splunk.getFirstMappedPort(); StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage(); - SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now", null); + SPLUNK_STORAGE_PLUGIN_CONFIG = new SplunkPluginConfig(SPLUNK_LOGIN, SPLUNK_PASS, hostname, port, "1", "now", + null, 4); SPLUNK_STORAGE_PLUGIN_CONFIG.setEnabled(true); pluginRegistry.put(SplunkPluginConfig.NAME, SPLUNK_STORAGE_PLUGIN_CONFIG); runningSuite = true; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java index c726d30..42cb7e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java @@ -67,7 +67,7 @@ public class ShowTablesHandler extends DefaultSqlHandler { if (schemaPlus == null) { throw UserException.validationError() - .message(String.format("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames))) + .message("Invalid schema name [%s]", SchemaUtilites.getSchemaPath(schemaNames)) .build(logger); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java index 035787e..59bfd18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java @@ -22,7 +22,6 @@ import org.apache.drill.common.logical.StoragePluginConfig; public class InfoSchemaConfig extends StoragePluginConfig { public static final String NAME = "ischema"; - public static final InfoSchemaConfig INSTANCE = new InfoSchemaConfig(); @Override diff --git a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java index 441ce73..ce8d8dc 100644 --- a/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java +++ b/logical/src/main/java/org/apache/drill/common/logical/AbstractSecuredStoragePluginConfig.java @@ -25,16 +25,15 @@ public abstract class AbstractSecuredStoragePluginConfig extends StoragePluginCo protected final CredentialsProvider credentialsProvider; protected boolean directCredentials; + public AbstractSecuredStoragePluginConfig() { + this(PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER, true); + } + public AbstractSecuredStoragePluginConfig(CredentialsProvider credentialsProvider, boolean directCredentials) { this.credentialsProvider = credentialsProvider; this.directCredentials = directCredentials; } - public AbstractSecuredStoragePluginConfig() { - this.credentialsProvider = PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER; - this.directCredentials = true; - } - public CredentialsProvider getCredentialsProvider() { if (directCredentials) { return null; diff --git a/pom.xml b/pom.xml index f7cf7f3..f5767f4 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ <wiremock.standalone.version>2.23.2</wiremock.standalone.version> <jmockit.version>1.47</jmockit.version> <logback.version>1.2.3</logback.version> - <mockito.version>3.11.0</mockito.version> + <mockito.version>3.11.2</mockito.version> <!-- Currently Hive storage plugin only supports Apache Hive 3.1.2 or vendor specific variants of the Apache Hive 2.3.2. If the version is changed, make sure the jars and their dependencies are updated, @@ -1117,7 +1117,13 @@ long as Mockito _contains_ older Hamcrest classes. See DRILL-2130. --> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> - <version>2.23.4</version> + <version>${mockito.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-inline</artifactId> + <version>${mockito.version}</version> <scope>test</scope> </dependency> <dependency>