[
https://issues.apache.org/jira/browse/METRON-1850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673066#comment-16673066
]
ASF GitHub Bot commented on METRON-1850:
----------------------------------------
Github user justinleet commented on a diff in the pull request:
https://github.com/apache/metron/pull/1250#discussion_r230363182
--- Diff:
metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
---
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl.functions;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static
org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+import static
org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
+
+/**
+ * Defines functions that enable REST requests with proper result and
error handling. Depends on an
+ * Apache HttpComponents client being supplied as a Stellar HTTP_CLIENT
capability. Exposes various Http settings
+ * including authentication, proxy and timeouts through the global config
with the option to override any settings
+ * through a config object supplied in the expression.
+ */
+public class RestFunctions {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Retrieves the ClosableHttpClient from the execution context.
+ *
+ * @param context The execution context.
+ * @return A ClosableHttpClient, if one exists. Otherwise, an exception
is thrown.
+ */
+ private static CloseableHttpClient getHttpClient(Context context) {
+ Optional<Object> clientOpt =
context.getCapability(Context.Capabilities.HTTP_CLIENT);
+ if(clientOpt.isPresent()) {
+ return (CloseableHttpClient) clientOpt.get();
+ } else {
+ throw new IllegalStateException("Missing HTTP_CLIENT; http
connection required");
+ }
+ }
+
+ /**
+ * Get an argument from a list of arguments.
+ *
+ * @param index The index within the list of arguments.
+ * @param clazz The type expected.
+ * @param args All of the arguments.
+ * @param <T> The type of the argument expected.
+ */
+ public static <T> T getArg(int index, Class<T> clazz, List<Object> args)
{
+
+ if(index >= args.size()) {
+ throw new IllegalArgumentException(format("Expected at least %d
argument(s), found %d", index+1, args.size()));
+ }
+
+ return ConversionUtils.convert(args.get(index), clazz);
+ }
+
+ @Stellar(
+ namespace = "REST",
+ name = "GET",
+ description = "Performs a REST GET request and parses the JSON
results into a map.",
+ params = {
+ "url - URI to the REST service",
+ "rest_config - Optional - Map (in curly braces) of
name:value pairs, each overriding the global config parameter " +
+ "of the same name. Default is the empty Map,
meaning no overrides."
+ },
+ returns = "JSON results as a Map")
+ public static class RestGet implements StellarFunction {
+
+ /**
+ * Whether the function has been initialized.
+ */
+ private boolean initialized = false;
+
+ /**
+ * The CloseableHttpClient.
+ */
+ private CloseableHttpClient httpClient;
+
+ /**
+ * Executor used to impose a hard request timeout.
+ */
+ private ScheduledExecutorService scheduledExecutorService;
+
+ /**
+ * Apply the function.
+ * @param args The function arguments including uri and rest config.
+ * @param context Stellar context
+ */
+ @Override
+ public Object apply(List<Object> args, Context context) throws
ParseException {
+ RestConfig restConfig = new RestConfig();
+ try {
+ URI uri = new URI(getArg(0, String.class, args));
+ Optional<Object> globalCapability =
context.getCapability(GLOBAL_CONFIG, false);
+
+ Map<String, Object> globalConfig = (Map<String, Object>)
globalCapability.get();
+
+ restConfig = getRestConfig(args, globalConfig);
+
+ HttpHost target = new HttpHost(uri.getHost(), uri.getPort(),
uri.getScheme());
+ Optional<HttpHost> proxy = getProxy(restConfig);
+ HttpClientContext httpClientContext =
getHttpClientContext(restConfig, target, proxy);
+
+ HttpGet httpGet = new HttpGet(uri);
+ httpGet.addHeader("Accept", "application/json");
+ httpGet.setConfig(getRequestConfig(restConfig, proxy));
+
+ return doGet(restConfig, httpGet, httpClientContext);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return restConfig.getErrorValueOverride();
+ }
+ }
+
+ /**
+ * Perform the HttpClient get and handle the results. A configurable
list of status codes are accepted and the
+ * response content (expected to be json) is parsed into a Map.
Values returned on errors and when response content
+ * is also configurable. The rest config "timeout" setting is imposed
in this method and will abort the get request
+ * if exceeded.
+ *
+ * @param restConfig
+ * @param httpGet
+ * @param httpClientContext
+ * @return
+ * @throws IOException
+ */
+ private Object doGet(RestConfig restConfig, HttpGet httpGet,
HttpClientContext httpClientContext) throws IOException {
+
+ // Schedule a command to abort the httpGet request if the timeout is
exceeded
--- End diff --
Could you elaborate in the docs somewhere that this is the hard timeout,
and the reason we need the hard timeouts is that the lib provided timeouts are
insufficient to achieve the hard timeout?
> Stellar REST function
> ---------------------
>
> Key: METRON-1850
> URL: https://issues.apache.org/jira/browse/METRON-1850
> Project: Metron
> Issue Type: New Feature
> Reporter: Ryan Merriman
> Priority: Major
>
> It would be useful to be able to enrich messages with Stellar using 3rd party
> (or internal) REST services. At a minimum this function would:
> * Stellar function available to GET from an HTTP API
> * Optional parameters for basic auth (user/password) which generate correct
> Authorization header
> * Function returns null value for errors, connection failures etc and logs
> error
> * Function must provide and use pooled connection objects at the process
> level
> * Function must send Accept: application/json header
> * A global setting must be available to set a proxy for all API calls, and
> if present the proxy must be used.
> * Proxy authentication must also be supported using basic auth.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)