Github user merrimanr commented on a diff in the pull request:
https://github.com/apache/metron/pull/1250#discussion_r230366579
--- Diff:
metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
---
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl.functions;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.common.utils.JSONUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+import static
org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+import static
org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
+
+/**
+ * Defines functions that enable REST requests with proper result and
error handling. Depends on an
+ * Apache HttpComponents client being supplied as a Stellar HTTP_CLIENT
capability. Exposes various Http settings
+ * including authentication, proxy and timeouts through the global config
with the option to override any settings
+ * through a config object supplied in the expression.
+ */
+public class RestFunctions {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Retrieves the ClosableHttpClient from the execution context.
+ *
+ * @param context The execution context.
+ * @return A ClosableHttpClient, if one exists. Otherwise, an exception
is thrown.
+ */
+ private static CloseableHttpClient getHttpClient(Context context) {
+ Optional<Object> clientOpt =
context.getCapability(Context.Capabilities.HTTP_CLIENT);
+ if(clientOpt.isPresent()) {
+ return (CloseableHttpClient) clientOpt.get();
+ } else {
+ throw new IllegalStateException("Missing HTTP_CLIENT; http
connection required");
+ }
+ }
+
+ /**
+ * Get an argument from a list of arguments.
+ *
+ * @param index The index within the list of arguments.
+ * @param clazz The type expected.
+ * @param args All of the arguments.
+ * @param <T> The type of the argument expected.
+ */
+ public static <T> T getArg(int index, Class<T> clazz, List<Object> args)
{
+
+ if(index >= args.size()) {
+ throw new IllegalArgumentException(format("Expected at least %d
argument(s), found %d", index+1, args.size()));
+ }
+
+ return ConversionUtils.convert(args.get(index), clazz);
+ }
+
+ @Stellar(
+ namespace = "REST",
+ name = "GET",
+ description = "Performs a REST GET request and parses the JSON
results into a map.",
+ params = {
+ "url - URI to the REST service",
+ "rest_config - Optional - Map (in curly braces) of
name:value pairs, each overriding the global config parameter " +
+ "of the same name. Default is the empty Map,
meaning no overrides."
+ },
+ returns = "JSON results as a Map")
+ public static class RestGet implements StellarFunction {
+
+ /**
+ * Whether the function has been initialized.
+ */
+ private boolean initialized = false;
+
+ /**
+ * The CloseableHttpClient.
+ */
+ private CloseableHttpClient httpClient;
+
+ /**
+ * Executor used to impose a hard request timeout.
+ */
+ private ScheduledExecutorService scheduledExecutorService;
+
+ /**
+ * Apply the function.
+ * @param args The function arguments including uri and rest config.
+ * @param context Stellar context
+ */
+ @Override
+ public Object apply(List<Object> args, Context context) throws
ParseException {
+ RestConfig restConfig = new RestConfig();
+ try {
+ URI uri = new URI(getArg(0, String.class, args));
+ Optional<Object> globalCapability =
context.getCapability(GLOBAL_CONFIG, false);
+
+ Map<String, Object> globalConfig = (Map<String, Object>)
globalCapability.get();
+
+ restConfig = getRestConfig(args, globalConfig);
+
+ HttpHost target = new HttpHost(uri.getHost(), uri.getPort(),
uri.getScheme());
+ Optional<HttpHost> proxy = getProxy(restConfig);
+ HttpClientContext httpClientContext =
getHttpClientContext(restConfig, target, proxy);
+
+ HttpGet httpGet = new HttpGet(uri);
+ httpGet.addHeader("Accept", "application/json");
+ httpGet.setConfig(getRequestConfig(restConfig, proxy));
+
+ return doGet(restConfig, httpGet, httpClientContext);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ return restConfig.getErrorValueOverride();
+ }
+ }
+
+ /**
+ * Perform the HttpClient get and handle the results. A configurable
list of status codes are accepted and the
+ * response content (expected to be json) is parsed into a Map.
Values returned on errors and when response content
+ * is also configurable. The rest config "timeout" setting is imposed
in this method and will abort the get request
+ * if exceeded.
+ *
+ * @param restConfig
+ * @param httpGet
+ * @param httpClientContext
+ * @return
+ * @throws IOException
+ */
+ private Object doGet(RestConfig restConfig, HttpGet httpGet,
HttpClientContext httpClientContext) throws IOException {
+
+ // Schedule a command to abort the httpGet request if the timeout is
exceeded
+ ScheduledFuture scheduledFuture =
scheduledExecutorService.schedule(httpGet::abort, restConfig.getTimeout(),
TimeUnit.MILLISECONDS);
+ CloseableHttpResponse response;
+ try {
+ response = httpClient.execute(httpGet, httpClientContext);
+ } catch(IOException e) {
+ // Report a timeout if the httpGet request was aborted. Otherwise
rethrow exception.
+ if (httpGet.isAborted()) {
+ throw new IOException(String.format("Total Stellar REST request
time to %s exceeded the configured timeout of %d ms.",
httpGet.getURI().toString(), restConfig.getTimeout()));
+ } else {
+ throw e;
+ }
+ }
+
+ // Cancel the future if the request finished within the timeout
+ if (!scheduledFuture.isDone()) {
+ scheduledFuture.cancel(true);
+ }
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
+ HttpEntity httpEntity = response.getEntity();
+
+ // Parse the reponse if present, return the empty value override
if not
+ if (httpEntity != null && httpEntity.getContentLength() > 0) {
+ String json = EntityUtils.toString(response.getEntity());
+ return JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER);
+ }
+ return restConfig.getEmptyContentOverride();
+ } else {
+ throw new IOException(String.format("Stellar REST request to %s
expected status code to be one of %s but " +
+ "failed with http status code %d: %s",
+ httpGet.getURI().toString(),
+ restConfig.getResponseCodesAllowed().toString(),
+ statusCode,
+ EntityUtils.toString(response.getEntity())));
+ }
+ }
+
+ /**
+ * Build the RestConfig object using the following order of precedence:
+ * <ul>
+ * <li>rest config supplied as an expression parameter</li>
+ * <li>rest config stored in the global config</li>
+ * <li>default rest config</li>
+ * </ul>
+ * Only settings specified in the rest config will override lower
priority config settings.
+ * @param args
+ * @param globalConfig
+ * @return
+ * @throws IOException
+ */
+ protected RestConfig getRestConfig(List<Object> args, Map<String,
Object> globalConfig) throws IOException {
+ Map<String, Object> globalRestConfig = (Map<String, Object>)
globalConfig.get(STELLAR_REST_SETTINGS);
+ Map<String, Object> functionRestConfig = null;
+ if (args.size() > 1) {
+ functionRestConfig = getArg(1, Map.class, args);
+ }
+
+ // Add settings in order of precedence
+ RestConfig restConfig = new RestConfig();
+ if (globalRestConfig != null) {
+ restConfig.putAll(globalRestConfig);
+ }
+ if (functionRestConfig != null) {
+ restConfig.putAll(functionRestConfig);
+ }
+ return restConfig;
+ }
+
+ /**
+ * Returns the proxy HttpHost object if the proxy rest config settings
are set.
+ * @param restConfig
+ * @return
+ */
+ protected Optional<HttpHost> getProxy(RestConfig restConfig) {
+ Optional<HttpHost> proxy = Optional.empty();
+ if (restConfig.getProxyHost() != null && restConfig.getProxyPort()
!= null) {
+ proxy = Optional.of(new HttpHost(restConfig.getProxyHost(),
restConfig.getProxyPort(), "http"));
+ }
+ return proxy;
+ }
+
+ /**
+ * Builds the RequestConfig object by setting HttpClient settings
defined in the rest config.
+ * @param restConfig
+ * @param proxy
+ * @return
+ */
+ protected RequestConfig getRequestConfig(RestConfig restConfig,
Optional<HttpHost> proxy) {
+ RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
+ if (restConfig.getConnectTimeout() != null) {
+
requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
+ }
+ if (restConfig.getConnectionRequestTimeout() != null) {
+
requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
+ }
+ if (restConfig.getSocketTimeout() != null) {
+
requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
+ }
+
+ proxy.ifPresent(requestConfigBuilder::setProxy);
+ return requestConfigBuilder.build();
+ }
+
+ /**
+ * Builds the HttpClientContext object by setting the basic auth
and/or proxy basic auth credentials when the
+ * necessary rest config settings are configured. Passwords are
stored in HDFS.
+ * @param restConfig
+ * @param target
+ * @param proxy
+ * @return
+ * @throws IOException
+ */
+ protected HttpClientContext getHttpClientContext(RestConfig
restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
+ HttpClientContext httpClientContext = HttpClientContext.create();
+ boolean credentialsAdded = false;
+ CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
+
+ // Add the basic auth credentials if the rest config settings are
present
+ if (restConfig.getBasicAuthUser() != null &&
restConfig.getBasicAuthPasswordPath() != null) {
+ String password = new String(readBytes(new
Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8).trim();
--- End diff --
Just being defensive. I found I had to do this when reading the contents
from HDFS. I can look into it again and try to track down exactly what the
problem was.
---