Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/metron/pull/946#discussion_r178923287
--- Diff:
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
---
@@ -107,32 +113,94 @@ public static String getBaseIndexName(String
indexName) {
return parts[0];
}
- public static TransportClient getClient(Map<String, Object>
globalConfiguration, Map<String, String> optionalSettings) {
+ /**
+ * Instantiates an Elasticsearch client based on es.client.class, if
set. Defaults to
+ * org.elasticsearch.transport.client.PreBuiltTransportClient.
+ *
+ * @param globalConfiguration Metron global config
+ * @return
+ */
+ public static TransportClient getClient(Map<String, Object>
globalConfiguration) {
+ Set<String> customESSettings = new HashSet<>();
+ customESSettings.addAll(Arrays.asList("es.client.class",
"es.xpack.username", "es.xpack.password.file"));
Settings.Builder settingsBuilder = Settings.builder();
- settingsBuilder.put("cluster.name",
globalConfiguration.get("es.clustername"));
- settingsBuilder.put("client.transport.ping_timeout","500s");
- if (optionalSettings != null) {
- settingsBuilder.put(optionalSettings);
+ Map<String, String> esSettings = getEsSettings(globalConfiguration);
+ for (Map.Entry<String, String> entry : esSettings.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (!customESSettings.contains(key)) {
+ settingsBuilder.put(key, value);
+ }
}
- Settings settings = settingsBuilder.build();
- TransportClient client;
- try{
+ settingsBuilder.put("cluster.name",
globalConfiguration.get("es.clustername"));
+ settingsBuilder.put("client.transport.ping_timeout",
esSettings.getOrDefault("client.transport.ping_timeout","500s"));
+ setXPackSecurityOrNone(settingsBuilder, esSettings);
+
+ try {
LOG.info("Number of available processors in Netty: {}",
NettyRuntimeWrapper.availableProcessors());
// Netty sets available processors statically and if an attempt is
made to set it more than
// once an IllegalStateException is thrown by
NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
//
https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
//
https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
System.setProperty("es.set.netty.runtime.available.processors",
"false");
- client = new PreBuiltTransportClient(settings);
- for(HostnamePort hp : getIps(globalConfiguration)) {
+ TransportClient client =
createTransportClient(settingsBuilder.build(), esSettings);
+ for (HostnamePort hp : getIps(globalConfiguration)) {
client.addTransportAddress(
new
InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
);
}
- } catch (UnknownHostException exception){
+ return client;
+ } catch (UnknownHostException exception) {
throw new RuntimeException(exception);
}
- return client;
+ }
+
+ private static Map<String, String> getEsSettings(Map<String, Object>
config) {
+ return ConversionUtils
+ .convertMap((Map<String, Object>)
config.getOrDefault("es.client.settings", new HashMap<String, Object>()),
+ String.class);
+ }
+
+ private static void setXPackSecurityOrNone(Settings.Builder
settingsBuilder, Map<String, String> esSettings) {
+ if (esSettings.containsKey("es.xpack.password.file")) {
--- End diff --
@nickwallen and @wardbekker - how about we do 3 things here
1. Check for both keys
```
if (esSettings.containsKey("es.xpack.username") &&
esSettings.containsKey("es.xpack.password.file"))
```
2. Warn on only 1 of the 2 required keys being present
```
else if (esSettings.containsKey("es.xpack.username") ^
esSettings.containsKey("es.xpack.password.file"))
```
3. Do a null/empty check on username similar to the password file
```
if (StringUtils.isEmtpy(xpackUsername) {
throw new IllegalArgumentException("X-Pack username cannot be empty");
}
```
---