Github user wardbekker commented on a diff in the pull request:

    https://github.com/apache/metron/pull/946#discussion_r179060961
  
    --- Diff: 
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
 ---
    @@ -107,32 +113,94 @@ public static String getBaseIndexName(String 
indexName) {
         return parts[0];
       }
     
    -  public static TransportClient getClient(Map<String, Object> 
globalConfiguration, Map<String, String> optionalSettings) {
    +  /**
    +   * Instantiates an Elasticsearch client based on es.client.class, if 
set. Defaults to
    +   * org.elasticsearch.transport.client.PreBuiltTransportClient.
    +   *
    +   * @param globalConfiguration Metron global config
    +   * @return
    +   */
    +  public static TransportClient getClient(Map<String, Object> 
globalConfiguration) {
    +    Set<String> customESSettings = new HashSet<>();
    +    customESSettings.addAll(Arrays.asList("es.client.class", 
"es.xpack.username", "es.xpack.password.file"));
         Settings.Builder settingsBuilder = Settings.builder();
    -    settingsBuilder.put("cluster.name", 
globalConfiguration.get("es.clustername"));
    -    settingsBuilder.put("client.transport.ping_timeout","500s");
    -    if (optionalSettings != null) {
    -      settingsBuilder.put(optionalSettings);
    +    Map<String, String> esSettings = getEsSettings(globalConfiguration);
    +    for (Map.Entry<String, String> entry : esSettings.entrySet()) {
    +      String key = entry.getKey();
    +      String value = entry.getValue();
    +      if (!customESSettings.contains(key)) {
    +        settingsBuilder.put(key, value);
    +      }
         }
    -    Settings settings = settingsBuilder.build();
    -    TransportClient client;
    -    try{
    +    settingsBuilder.put("cluster.name", 
globalConfiguration.get("es.clustername"));
    +    settingsBuilder.put("client.transport.ping_timeout", 
esSettings.getOrDefault("client.transport.ping_timeout","500s"));
    +    setXPackSecurityOrNone(settingsBuilder, esSettings);
    +
    +    try {
           LOG.info("Number of available processors in Netty: {}", 
NettyRuntimeWrapper.availableProcessors());
           // Netty sets available processors statically and if an attempt is 
made to set it more than
           // once an IllegalStateException is thrown by 
NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
           // 
https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
           // 
https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
           System.setProperty("es.set.netty.runtime.available.processors", 
"false");
    -      client = new PreBuiltTransportClient(settings);
    -      for(HostnamePort hp : getIps(globalConfiguration)) {
    +      TransportClient client = 
createTransportClient(settingsBuilder.build(), esSettings);
    +      for (HostnamePort hp : getIps(globalConfiguration)) {
             client.addTransportAddress(
                     new 
InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
             );
           }
    -    } catch (UnknownHostException exception){
    +      return client;
    +    } catch (UnknownHostException exception) {
           throw new RuntimeException(exception);
         }
    -    return client;
    +  }
    +
    +  private static Map<String, String> getEsSettings(Map<String, Object> 
config) {
    +    return ConversionUtils
    +        .convertMap((Map<String, Object>) 
config.getOrDefault("es.client.settings", new HashMap<String, Object>()),
    +            String.class);
    +  }
    +
    +  private static void setXPackSecurityOrNone(Settings.Builder 
settingsBuilder, Map<String, String> esSettings) {
    +    if (esSettings.containsKey("es.xpack.password.file")) {
    --- End diff --
    
    Done


---

Reply via email to