This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new e8c1dc6 DRILL-8181: Accept nullable args in HTTP plugin UDFs, fix
HikariCP default parm names (#2509)
e8c1dc6 is described below
commit e8c1dc62ac2449c29385140ddcea4dab73334b0f
Author: James Turton <[email protected]>
AuthorDate: Thu Mar 31 15:49:49 2022 +0200
DRILL-8181: Accept nullable args in HTTP plugin UDFs, fix HikariCP default
parm names (#2509)
---
.../exec/store/http/udfs/HttpHelperFunctions.java | 60 +++++++++++++++++++---
.../drill/exec/store/http/util/SimpleHttp.java | 60 +++++++++-------------
.../exec/store/http/TestHttpUDFFunctions.java | 30 ++++++++---
.../drill/exec/store/jdbc/JdbcStoragePlugin.java | 19 +++----
.../org/apache/drill/exec/ssl/SSLConfigServer.java | 3 ++
5 files changed, 113 insertions(+), 59 deletions(-)
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index 256fbee..bdfd0d3 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -24,6 +24,7 @@ import
org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.Output;
import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
@@ -42,7 +43,7 @@ public class HttpHelperFunctions {
VarCharHolder rawInput;
@Param
- VarCharHolder[] inputReaders;
+ NullableVarCharHolder[] inputReaders;
@Output
ComplexWriter writer;
@@ -73,6 +74,16 @@ public class HttpHelperFunctions {
// Process Positional Arguments
java.util.List args =
org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
+ // If the arg list is null, indicating at least one null arg, return an
empty map
+ // as an approximation of null-if-null handling.
+ if (args == null) {
+ // Return empty map
+ org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter
mapWriter = writer.rootAsMap();
+ mapWriter.start();
+ mapWriter.end();
+ return;
+ }
+
String finalUrl =
org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url,
args);
// Make the API call
@@ -108,7 +119,7 @@ public class HttpHelperFunctions {
VarCharHolder rawInput;
@Param
- VarCharHolder[] inputReaders;
+ NullableVarCharHolder[] inputReaders;
@Output
ComplexWriter writer;
@@ -125,6 +136,12 @@ public class HttpHelperFunctions {
@Workspace
org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+ @Workspace
+ org.apache.drill.exec.store.http.HttpStoragePlugin plugin;
+
+ @Workspace
+ org.apache.drill.exec.store.http.HttpApiConfig endpointConfig;
+
@Override
public void setup() {
jsonReader = new
org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
@@ -133,16 +150,47 @@ public class HttpHelperFunctions {
.allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
.enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
.build();
+
+ String schemaPath =
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start,
rawInput.end, rawInput.buffer);
+ // Get the plugin name and endpoint name
+ String[] parts = schemaPath.split("\\.");
+ if (parts.length < 2) {
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException(
+ "You must call this function with a connection name and endpoint."
+ );
+ }
+ String pluginName = parts[0], endpointName = parts[1];
+
+ plugin =
org.apache.drill.exec.store.http.util.SimpleHttp.getStoragePlugin(
+ drillbitContext,
+ pluginName
+ );
+ endpointConfig =
org.apache.drill.exec.store.http.util.SimpleHttp.getEndpointConfig(
+ endpointName,
+ plugin.getConfig()
+ );
}
@Override
public void eval() {
- // Get the plugin name
- String pluginName =
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start,
rawInput.end, rawInput.buffer);
-
// Process Positional Arguments
java.util.List args =
org.apache.drill.exec.store.http.util.SimpleHttp.buildParameterList(inputReaders);
- String results =
org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(pluginName,
drillbitContext, args);
+ // If the arg list is null, indicating at least one null arg, return an
empty map
+ // as an approximation of null-if-null handling.
+ if (args == null) {
+ // Return empty map
+ org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter
mapWriter = writer.rootAsMap();
+ mapWriter.start();
+ mapWriter.end();
+ return;
+ }
+
+ String results =
org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(
+ plugin,
+ endpointConfig,
+ drillbitContext,
+ args
+ );
// If the result string is null or empty, return an empty map
if (results == null || results.length() == 0) {
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index b5812c1..199635b 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -37,7 +37,7 @@ import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.oauth.PersistentTokenTable;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePlugin;
@@ -796,36 +796,36 @@ public class SimpleHttp {
/**
* Accepts a list of input readers and converts that into an ArrayList of
Strings
* @param inputReaders The array of FieldReaders
- * @return A List of Strings containing the values from the FieldReaders.
+ * @return A List of Strings containing the values from the FieldReaders or
null
+ * to indicate that at least one null is present in the arguments.
*/
- public static List<String> buildParameterList(VarCharHolder[] inputReaders) {
+ public static List<String> buildParameterList(NullableVarCharHolder[]
inputReaders) {
if (inputReaders == null || inputReaders.length == 0) {
+ // no args provided
return Collections.emptyList();
}
List<String> inputArguments = new ArrayList<>();
for (int i = 0; i < inputReaders.length; i++) {
+ if (inputReaders[i].buffer == null) {
+ // at least one null arg provided
+ return null;
+ }
+
inputArguments.add(StringFunctionHelpers.getStringFromVarCharHolder(inputReaders[i]));
}
return inputArguments;
}
+/*
public static HttpStoragePluginConfig getPluginConfig(String name,
DrillbitContext context) throws PluginException {
HttpStoragePlugin httpStoragePlugin = getStoragePlugin(context, name);
return httpStoragePlugin.getConfig();
}
+*/
- public static HttpApiConfig getEndpointConfig(String name,
HttpStoragePluginConfig pluginConfig) {
- // Get the plugin name and endpoint name
- String[] parts = name.split("\\.");
- if (parts.length < 2) {
- throw UserException.functionError()
- .message("You must call this function with a connection name and
endpoint.")
- .build(logger);
- }
-
- String endpoint = parts[1];
+ public static HttpApiConfig getEndpointConfig(String endpoint,
HttpStoragePluginConfig pluginConfig) {
HttpApiConfig endpointConfig = pluginConfig.getConnection(endpoint);
if (endpointConfig == null) {
throw UserException.functionError()
@@ -840,7 +840,7 @@ public class SimpleHttp {
return endpointConfig;
}
- private static HttpStoragePlugin getStoragePlugin(DrillbitContext context,
String pluginName) {
+ public static HttpStoragePlugin getStoragePlugin(DrillbitContext context,
String pluginName) {
StoragePluginRegistry storage = context.getStorage();
try {
StoragePlugin pluginInstance = storage.getPlugin(pluginName);
@@ -868,34 +868,20 @@ public class SimpleHttp {
* This function makes an API call and returns a string of the parsed
results. It is used in the http_get() UDF
* and retrieves all the configuration parameters contained in the storage
plugin and endpoint configuration. The exception
* is pagination. This does not support pagination.
- * @param schemaPath The path of storage_plugin.endpoint from which the data
will be retrieved
+ * @param plugin The HTTP storage plugin upon which the API call is based.
+ * @param endpointConfig The configuration of the API endpoint upon which
the API call is based.
* @param context {@link DrillbitContext} The context from the current query
* @param args An optional list of parameter arguments which will be
included in the URL
* @return A String of the results.
*/
- public static String makeAPICall(String schemaPath, DrillbitContext context,
List<String> args) {
+ public static String makeAPICall(
+ HttpStoragePlugin plugin,
+ HttpApiConfig endpointConfig,
+ DrillbitContext context,
+ List<String> args
+ ) {
HttpStoragePluginConfig pluginConfig;
- HttpApiConfig endpointConfig;
-
- // Get the plugin name and endpoint name
- String[] parts = schemaPath.split("\\.");
- if (parts.length < 2) {
- throw UserException.functionError()
- .message("You must call this function with a connection name and
endpoint.")
- .build(logger);
- }
- String pluginName = parts[0];
-
- HttpStoragePlugin plugin = getStoragePlugin(context, pluginName);
-
- try {
- pluginConfig = getPluginConfig(pluginName, context);
- endpointConfig = getEndpointConfig(schemaPath, pluginConfig);
- } catch (PluginException e) {
- throw UserException.functionError()
- .message("Could not access plugin " + pluginName)
- .build(logger);
- }
+ pluginConfig = plugin.getConfig();
// Get proxy settings
HttpProxyConfig proxyConfig = SimpleHttp.getProxySettings(pluginConfig,
context.getConfig(), endpointConfig.getHttpUrl());
diff --git
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 0439979..8f17b26 100644
---
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -47,9 +47,9 @@ import static org.junit.Assert.fail;
public class TestHttpUDFFunctions extends ClusterTest {
- private static final int MOCK_SERVER_PORT = 47770;
+ private static final int MOCK_SERVER_PORT = 47771;
private static String TEST_JSON_RESPONSE;
- private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT +
"/";
+ private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT;
@BeforeClass
@@ -58,7 +58,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
TEST_JSON_RESPONSE =
Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"),
Charsets.UTF_8).read();
HttpApiConfig mockGithubWithDuplicateParam = HttpApiConfig.builder()
- .url("http://localhost:47770/orgs/{org}/repos")
+ .url(String.format("%s/orgs/{org}/repos", DUMMY_URL))
.method("GET")
.params(Arrays.asList("org", "lng", "date"))
.dataPath("results")
@@ -89,7 +89,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
RecordedRequest recordedRequest = server.takeRequest();
assertEquals("GET", recordedRequest.getMethod());
- assertEquals("http://localhost:47770/",
recordedRequest.getRequestUrl().toString());
+ assertEquals(String.format("%s/", DUMMY_URL),
recordedRequest.getRequestUrl().toString());
}
}
@@ -97,7 +97,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
public void testHttpGetWithParams() throws Exception {
try (MockWebServer server = startServer()) {
server.enqueue(new
MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
- String sql = "SELECT http_get('" + DUMMY_URL + "{p1}/{p2}', 'param1',
'param2') AS result FROM (values(1))";
+ String sql = "SELECT http_get('" + DUMMY_URL + "/{p1}/{p2}', 'param1',
'param2') AS result FROM (values(1))";
RowSet results = client.queryBuilder().sql(sql).rowSet();
assertEquals(1, results.rowCount());
@@ -105,7 +105,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
RecordedRequest recordedRequest = server.takeRequest();
assertEquals("GET", recordedRequest.getMethod());
- assertEquals("http://localhost:47770/param1/param2",
recordedRequest.getRequestUrl().toString());
+ assertEquals(String.format("%s/param1/param2", DUMMY_URL),
recordedRequest.getRequestUrl().toString());
}
}
@@ -121,7 +121,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
RecordedRequest recordedRequest = server.takeRequest();
assertEquals("GET", recordedRequest.getMethod());
- assertEquals("http://localhost:47770/orgs/apache/repos",
recordedRequest.getRequestUrl().toString());
+ assertEquals(String.format("%s/orgs/apache/repos", DUMMY_URL),
recordedRequest.getRequestUrl().toString());
}
}
@@ -137,6 +137,22 @@ public class TestHttpUDFFunctions extends ClusterTest {
}
@Test
+ public void testNullParam() throws Exception {
+ // any null parameter results in an empty map
+ String sql = "SELECT http_get('" + DUMMY_URL + "/{p1}/{p2}', 'param1',
null) AS result FROM (values(1))";
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ assertEquals(0, results.container().getLast().getField().getChildCount());
+ results.clear();
+
+ sql = "SELECT http_request('local.github', null) AS result FROM
(values(1))";
+ results = client.queryBuilder().sql(sql).rowSet();
+ assertEquals(1, results.rowCount());
+ assertEquals(0, results.container().getLast().getField().getChildCount());
+ results.clear();
+ }
+
+ @Test
public void testPositionalReplacement() {
String url = "http://somesite.com/{p1}/{p2}/path/{}";
List<String> params = new ArrayList<>();
diff --git
a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
index 8ae93a3..32d4d52 100644
---
a/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
+++
b/contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcStoragePlugin.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.jdbc;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
@@ -134,16 +135,16 @@ public class JdbcStoragePlugin extends
AbstractStoragePlugin {
storage config.
*/
- // maximum amount of time that a connection is allowed to sit idle in
the pool, 0 = forever
- properties.setProperty("dataSource.idleTimeout", String.format("%d000",
1*60*60)); // 1 hour
- // how frequently HikariCP will attempt to keep a connection alive, 0 =
disabled
- properties.setProperty("dataSource.keepaliveTime",
String.format("%d000", 0));
- // maximum lifetime of a connection in the pool, 0 = forever
- properties.setProperty("dataSource.maxLifetime", String.format("%d000",
6*60*60)); // 6 hours
- // minimum number of idle connections that HikariCP tries to maintain in
the pool, 0 = none
- properties.setProperty("dataSource.minimumIdle", "0");
+ // maximum amount of time that a connection is allowed to sit idle in
the pool, 0 ⇒ forever
+ properties.setProperty("idleTimeout",
String.valueOf(TimeUnit.HOURS.toMillis(2)));
+ // how frequently HikariCP will attempt to keep a connection alive, 0 ⇒
disabled
+ properties.setProperty("keepaliveTime",
String.valueOf(TimeUnit.MINUTES.toMillis(5)));
+ // maximum lifetime of a connection in the pool, 0 ⇒ forever
+ properties.setProperty("maxLifetime",
String.valueOf(TimeUnit.HOURS.toMillis(12)));
+ // minimum number of idle connections that HikariCP tries to maintain in
the pool, 0 ⇒ none
+ properties.setProperty("minimumIdle", "0");
// maximum size that the pool is allowed to reach, including both idle
and in-use connections
- properties.setProperty("dataSource.maximumPoolSize", "10");
+ properties.setProperty("maximumPoolSize", "10");
// apply any HikariCP parameters the user may have set, overwriting
defaults
properties.putAll(config.getSourceParameters());
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java
index e184a91..cafb2ef 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java
@@ -306,6 +306,9 @@ public class SSLConfigServer extends SSLConfig {
@Override
public int getHandshakeTimeout() {
+ // TODO: (DRILL-8183) why do we hard code this when we provide
+ // {@link ExecConstants.SSL_HANDSHAKE_TIMEOUT}?
+ // A value of 0 is interpreted by Netty as "no timeout".
return 0;
}