Github user merrimanr commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1250#discussion_r230366579
  
    --- Diff: 
metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/RestFunctions.java
 ---
    @@ -0,0 +1,351 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.stellar.dsl.functions;
    +
    +import org.apache.commons.io.IOUtils;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.http.HttpEntity;
    +import org.apache.http.HttpHost;
    +import org.apache.http.auth.AuthScope;
    +import org.apache.http.auth.UsernamePasswordCredentials;
    +import org.apache.http.client.CredentialsProvider;
    +import org.apache.http.client.config.RequestConfig;
    +import org.apache.http.client.methods.CloseableHttpResponse;
    +import org.apache.http.client.methods.HttpGet;
    +import org.apache.http.client.protocol.HttpClientContext;
    +import org.apache.http.impl.client.BasicCredentialsProvider;
    +import org.apache.http.impl.client.CloseableHttpClient;
    +import org.apache.http.util.EntityUtils;
    +import org.apache.metron.stellar.common.utils.ConversionUtils;
    +import org.apache.metron.stellar.common.utils.JSONUtils;
    +import org.apache.metron.stellar.dsl.Context;
    +import org.apache.metron.stellar.dsl.ParseException;
    +import org.apache.metron.stellar.dsl.Stellar;
    +import org.apache.metron.stellar.dsl.StellarFunction;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.lang.invoke.MethodHandles;
    +import java.net.URI;
    +import java.net.URISyntaxException;
    +import java.nio.charset.StandardCharsets;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.ScheduledFuture;
    +import java.util.concurrent.TimeUnit;
    +
    +import static java.lang.String.format;
    +import static 
org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
    +import static 
org.apache.metron.stellar.dsl.functions.RestConfig.STELLAR_REST_SETTINGS;
    +
    +/**
    + * Defines functions that enable REST requests with proper result and 
error handling.  Depends on an
    + * Apache HttpComponents client being supplied as a Stellar HTTP_CLIENT 
capability.  Exposes various Http settings
    + * including authentication, proxy and timeouts through the global config 
with the option to override any settings
    + * through a config object supplied in the expression.
    + */
    +public class RestFunctions {
    +
    +  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    +
    +  /**
    +   * Retrieves the ClosableHttpClient from the execution context.
    +   *
    +   * @param context The execution context.
    +   * @return A ClosableHttpClient, if one exists.  Otherwise, an exception 
is thrown.
    +   */
    +  private static CloseableHttpClient getHttpClient(Context context) {
    +    Optional<Object> clientOpt = 
context.getCapability(Context.Capabilities.HTTP_CLIENT);
    +    if(clientOpt.isPresent()) {
    +      return (CloseableHttpClient) clientOpt.get();
    +    } else {
    +      throw new IllegalStateException("Missing HTTP_CLIENT; http 
connection required");
    +    }
    +  }
    +
    +  /**
    +   * Get an argument from a list of arguments.
    +   *
    +   * @param index The index within the list of arguments.
    +   * @param clazz The type expected.
    +   * @param args All of the arguments.
    +   * @param <T> The type of the argument expected.
    +   */
    +  public static <T> T getArg(int index, Class<T> clazz, List<Object> args) 
{
    +
    +    if(index >= args.size()) {
    +      throw new IllegalArgumentException(format("Expected at least %d 
argument(s), found %d", index+1, args.size()));
    +    }
    +
    +    return ConversionUtils.convert(args.get(index), clazz);
    +  }
    +
    +  @Stellar(
    +          namespace = "REST",
    +          name = "GET",
    +          description = "Performs a REST GET request and parses the JSON 
results into a map.",
    +          params = {
    +                  "url - URI to the REST service",
    +                  "rest_config - Optional - Map (in curly braces) of 
name:value pairs, each overriding the global config parameter " +
    +                          "of the same name. Default is the empty Map, 
meaning no overrides."
    +          },
    +          returns = "JSON results as a Map")
    +  public static class RestGet implements StellarFunction {
    +
    +    /**
    +     * Whether the function has been initialized.
    +     */
    +    private boolean initialized = false;
    +
    +    /**
    +     * The CloseableHttpClient.
    +     */
    +    private CloseableHttpClient httpClient;
    +
    +    /**
    +     * Executor used to impose a hard request timeout.
    +     */
    +    private ScheduledExecutorService scheduledExecutorService;
    +
    +    /**
    +     * Apply the function.
    +     * @param args The function arguments including uri and rest config.
    +     * @param context Stellar context
    +     */
    +    @Override
    +    public Object apply(List<Object> args, Context context) throws 
ParseException {
    +      RestConfig restConfig = new RestConfig();
    +      try {
    +        URI uri = new URI(getArg(0, String.class, args));
    +        Optional<Object> globalCapability = 
context.getCapability(GLOBAL_CONFIG, false);
    +
    +        Map<String, Object> globalConfig = (Map<String, Object>) 
globalCapability.get();
    +
    +        restConfig = getRestConfig(args, globalConfig);
    +
    +        HttpHost target = new HttpHost(uri.getHost(), uri.getPort(), 
uri.getScheme());
    +        Optional<HttpHost> proxy = getProxy(restConfig);
    +        HttpClientContext httpClientContext = 
getHttpClientContext(restConfig, target, proxy);
    +
    +        HttpGet httpGet = new HttpGet(uri);
    +        httpGet.addHeader("Accept", "application/json");
    +        httpGet.setConfig(getRequestConfig(restConfig, proxy));
    +
    +        return doGet(restConfig, httpGet, httpClientContext);
    +      } catch (URISyntaxException e) {
    +        throw new IllegalArgumentException(e.getMessage(), e);
    +      } catch (IOException e) {
    +        LOG.error(e.getMessage(), e);
    +        return restConfig.getErrorValueOverride();
    +      }
    +    }
    +
    +    /**
    +     * Perform the HttpClient get and handle the results.  A configurable 
list of status codes are accepted and the
    +     * response content (expected to be json) is parsed into a Map.  
Values returned on errors and when response content
    +     * is also configurable.  The rest config "timeout" setting is imposed 
in this method and will abort the get request
    +     * if exceeded.
    +     *
    +     * @param restConfig
    +     * @param httpGet
    +     * @param httpClientContext
    +     * @return
    +     * @throws IOException
    +     */
    +    private Object doGet(RestConfig restConfig, HttpGet httpGet, 
HttpClientContext httpClientContext) throws IOException {
    +
    +      // Schedule a command to abort the httpGet request if the timeout is 
exceeded
    +      ScheduledFuture scheduledFuture = 
scheduledExecutorService.schedule(httpGet::abort, restConfig.getTimeout(), 
TimeUnit.MILLISECONDS);
    +      CloseableHttpResponse response;
    +      try {
    +        response = httpClient.execute(httpGet, httpClientContext);
    +      } catch(IOException e) {
    +        // Report a timeout if the httpGet request was aborted.  Otherwise 
rethrow exception.
    +        if (httpGet.isAborted()) {
    +          throw new IOException(String.format("Total Stellar REST request 
time to %s exceeded the configured timeout of %d ms.", 
httpGet.getURI().toString(), restConfig.getTimeout()));
    +        } else {
    +          throw e;
    +        }
    +      }
    +
    +      // Cancel the future if the request finished within the timeout
    +      if (!scheduledFuture.isDone()) {
    +        scheduledFuture.cancel(true);
    +      }
    +      int statusCode = response.getStatusLine().getStatusCode();
    +      if (restConfig.getResponseCodesAllowed().contains(statusCode)) {
    +        HttpEntity httpEntity = response.getEntity();
    +
    +        // Parse the reponse if present, return the empty value override 
if not
    +        if (httpEntity != null && httpEntity.getContentLength() > 0) {
    +          String json = EntityUtils.toString(response.getEntity());
    +          return JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER);
    +        }
    +        return restConfig.getEmptyContentOverride();
    +      } else {
    +        throw new IOException(String.format("Stellar REST request to %s 
expected status code to be one of %s but " +
    +                "failed with http status code %d: %s",
    +                httpGet.getURI().toString(),
    +                restConfig.getResponseCodesAllowed().toString(),
    +                statusCode,
    +                EntityUtils.toString(response.getEntity())));
    +      }
    +    }
    +
    +    /**
    +     * Build the RestConfig object using the following order of precedence:
    +     * <ul>
    +     *   <li>rest config supplied as an expression parameter</li>
    +     *   <li>rest config stored in the global config</li>
    +     *   <li>default rest config</li>
    +     * </ul>
    +     * Only settings specified in the rest config will override lower 
priority config settings.
    +     * @param args
    +     * @param globalConfig
    +     * @return
    +     * @throws IOException
    +     */
    +    protected RestConfig getRestConfig(List<Object> args, Map<String, 
Object> globalConfig) throws IOException {
    +      Map<String, Object> globalRestConfig = (Map<String, Object>) 
globalConfig.get(STELLAR_REST_SETTINGS);
    +      Map<String, Object> functionRestConfig = null;
    +      if (args.size() > 1) {
    +        functionRestConfig = getArg(1, Map.class, args);
    +      }
    +
    +      // Add settings in order of precedence
    +      RestConfig restConfig = new RestConfig();
    +      if (globalRestConfig != null) {
    +        restConfig.putAll(globalRestConfig);
    +      }
    +      if (functionRestConfig != null) {
    +        restConfig.putAll(functionRestConfig);
    +      }
    +      return restConfig;
    +    }
    +
    +    /**
    +     * Returns the proxy HttpHost object if the proxy rest config settings 
are set.
    +     * @param restConfig
    +     * @return
    +     */
    +    protected Optional<HttpHost> getProxy(RestConfig restConfig) {
    +      Optional<HttpHost> proxy = Optional.empty();
    +      if (restConfig.getProxyHost() != null && restConfig.getProxyPort() 
!= null) {
    +        proxy = Optional.of(new HttpHost(restConfig.getProxyHost(), 
restConfig.getProxyPort(), "http"));
    +      }
    +      return proxy;
    +    }
    +
    +    /**
    +     * Builds the RequestConfig object by setting HttpClient settings 
defined in the rest config.
    +     * @param restConfig
    +     * @param proxy
    +     * @return
    +     */
    +    protected RequestConfig getRequestConfig(RestConfig restConfig, 
Optional<HttpHost> proxy) {
    +      RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
    +      if (restConfig.getConnectTimeout() != null) {
    +        
requestConfigBuilder.setConnectTimeout(restConfig.getConnectTimeout());
    +      }
    +      if (restConfig.getConnectionRequestTimeout() != null) {
    +        
requestConfigBuilder.setConnectionRequestTimeout(restConfig.getConnectionRequestTimeout());
    +      }
    +      if (restConfig.getSocketTimeout() != null) {
    +        
requestConfigBuilder.setSocketTimeout(restConfig.getSocketTimeout());
    +      }
    +
    +      proxy.ifPresent(requestConfigBuilder::setProxy);
    +      return requestConfigBuilder.build();
    +    }
    +
    +    /**
    +     * Builds the HttpClientContext object by setting the basic auth 
and/or proxy basic auth credentials when the
    +     * necessary rest config settings are configured.  Passwords are 
stored in HDFS.
    +     * @param restConfig
    +     * @param target
    +     * @param proxy
    +     * @return
    +     * @throws IOException
    +     */
    +    protected HttpClientContext getHttpClientContext(RestConfig 
restConfig, HttpHost target, Optional<HttpHost> proxy) throws IOException {
    +      HttpClientContext httpClientContext = HttpClientContext.create();
    +      boolean credentialsAdded = false;
    +      CredentialsProvider credentialsProvider = new 
BasicCredentialsProvider();
    +
    +      // Add the basic auth credentials if the rest config settings are 
present
    +      if (restConfig.getBasicAuthUser() != null && 
restConfig.getBasicAuthPasswordPath() != null) {
    +        String password = new String(readBytes(new 
Path(restConfig.getBasicAuthPasswordPath())), StandardCharsets.UTF_8).trim();
    --- End diff --
    
    Just being defensive.  I found I had to do this when reading the contents 
from HDFS.  I can look into it again and try to track down exactly what the 
problem was.


---

Reply via email to