This is an automated email from the ASF dual-hosted git repository. snagel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nutch.git
commit f6b17177ad6049b5642d9510cb60fe0a1d3b5f1c Author: tallison <talli...@apache.org> AuthorDate: Wed Mar 1 12:16:17 2023 -0500 NUTCH-2920 -- add keystore for 2-way tls; add back in no-tls option with a stern warning and possibly helpful links. --- .../opensearch1x/OpenSearch1xConstants.java | 6 +- .../opensearch1x/OpenSearch1xIndexWriter.java | 137 +++++++++++++++------ 2 files changed, 99 insertions(+), 44 deletions(-) diff --git a/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xConstants.java b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xConstants.java index 8ca5038dd..cb172bda2 100644 --- a/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xConstants.java +++ b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xConstants.java @@ -20,14 +20,14 @@ public interface OpenSearch1xConstants { String HOSTS = "host"; String PORT = "port"; String SCHEME = "scheme"; - String USER = "username"; String PASSWORD = "password"; - String USE_AUTH = "auth"; - String TRUST_STORE_PATH = "trust.store.path"; String TRUST_STORE_PASSWORD = "trust.store.password"; String TRUST_STORE_TYPE = "trust.store.type"; + String KEY_STORE_PATH = "key.store.path"; + String KEY_STORE_PASSWORD = "key.store.password"; + String KEY_STORE_TYPE = "key.store.type"; String INDEX = "index"; String MAX_BULK_DOCS = "max.bulk.docs"; String MAX_BULK_LENGTH = "max.bulk.size"; diff --git a/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xIndexWriter.java b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xIndexWriter.java index e796a69e4..a121f15a2 100644 --- a/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xIndexWriter.java +++ b/src/plugin/indexer-opensearch-1x/src/java/org/apache/nutch/indexwriter/opensearch1x/OpenSearch1xIndexWriter.java @@ -22,6 +22,7 @@ import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.ssl.SSLContextBuilder; @@ -52,18 +53,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.lang.invoke.MethodHandles; import java.nio.file.Files; import java.nio.file.Paths; +import java.security.GeneralSecurityException; import java.security.KeyStore; import java.time.format.DateTimeFormatter; + import java.util.AbstractMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -80,18 +83,19 @@ public class OpenSearch1xIndexWriter implements IndexWriter { private static final int DEFAULT_EXP_BACKOFF_RETRIES = 10; private static final int DEFAULT_BULK_CLOSE_TIMEOUT = 600; private static final String DEFAULT_INDEX = "nutch"; - private static final String DEFAULT_USER = "elastic"; - + private static final String DEFAULT_USER = "admin"; + private static final String DEFAULT_PASSWORD = "admin"; private String[] hosts; private int port; - private String scheme = HttpHost.DEFAULT_SCHEME_NAME; - private String user = null; - private String password = null; - private boolean auth; - + private String scheme = "https"; + private String user; + private String password; private String trustStorePath; private String trustStorePassword; private String trustStoreType; + private String keyStorePath; + private String keyStorePassword; + private String keyStoreType; private int maxBulkDocs; private int maxBulkLength; private int expBackoffMillis; @@ -105,6 +109,7 @@ public class OpenSearch1xIndexWriter implements IndexWriter { private Configuration config; + @Override public void open(Configuration conf, String name) throws IOException { // Implementation not required @@ -125,7 +130,7 @@ public class OpenSearch1xIndexWriter implements IndexWriter { String hosts = parameters.get(OpenSearch1xConstants.HOSTS); if (StringUtils.isBlank(hosts)) { - String message = "Missing elastic.host this should be set in index-writers.xml "; + String message = "Missing " + OpenSearch1xConstants.HOSTS + ". this should be set in index-writers.xml "; message += "\n" + describe(); LOG.error(message); throw new RuntimeException(message); @@ -179,18 +184,23 @@ public class OpenSearch1xIndexWriter implements IndexWriter { port = parameters.getInt(OpenSearch1xConstants.PORT, DEFAULT_PORT); scheme = parameters.get(OpenSearch1xConstants.SCHEME, HttpHost.DEFAULT_SCHEME_NAME); - auth = parameters.getBoolean(OpenSearch1xConstants.USE_AUTH, false); user = parameters.get(OpenSearch1xConstants.USER, DEFAULT_USER); - password = parameters.get(OpenSearch1xConstants.PASSWORD, ""); + password = parameters.get(OpenSearch1xConstants.PASSWORD, DEFAULT_PASSWORD); + trustStorePath = parameters.get(OpenSearch1xConstants.TRUST_STORE_PATH); - trustStorePassword = parameters.get( - OpenSearch1xConstants.TRUST_STORE_PASSWORD); - trustStoreType = parameters.get(OpenSearch1xConstants.TRUST_STORE_TYPE, - "JKS"); + trustStorePassword = parameters.get(OpenSearch1xConstants.TRUST_STORE_PASSWORD); + trustStoreType = parameters.get(OpenSearch1xConstants.TRUST_STORE_TYPE, "JKS"); + + keyStorePath = parameters.get(OpenSearch1xConstants.KEY_STORE_PATH); + keyStorePassword = parameters.get(OpenSearch1xConstants.KEY_STORE_PASSWORD); + keyStoreType = parameters.get(OpenSearch1xConstants.KEY_STORE_TYPE, "JKS"); + boolean basicAuth = user != null && password != null; final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(user, password)); + if (basicAuth) { + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(user, password)); + } RestHighLevelClient client = null; @@ -202,7 +212,7 @@ public class OpenSearch1xIndexWriter implements IndexWriter { } RestClientBuilder restClientBuilder = RestClient.builder(hostsList); - if ("http".equals(scheme) && auth) { + if ("http".equals(scheme) && basicAuth) { restClientBuilder.setHttpClientConfigCallback( new HttpClientConfigCallback() { @Override @@ -212,29 +222,17 @@ public class OpenSearch1xIndexWriter implements IndexWriter { credentialsProvider); } }); - } - - // In case of HTTPS, set up trust store - if ("https".equals(scheme)) { + } else if ("https".equals(scheme)) { try { - SSLContextBuilder sslBuilder = SSLContexts.custom(); - KeyStore trustStore = KeyStore.getInstance("JKS"); - try (InputStream is = Files.newInputStream( - Paths.get(trustStorePath))) { - trustStore.load(is, trustStorePassword.toCharArray()); - } - sslBuilder.loadTrustMaterial(trustStore, null); - final SSLContext sslContext = sslBuilder.build(); + + final SSLContext sslContext = createSSLContext(); restClientBuilder.setHttpClientConfigCallback( new HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient( HttpAsyncClientBuilder httpClientBuilder) { - //do we still want this?! - //httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE); - if (auth) { - httpClientBuilder.setDefaultCredentialsProvider( - credentialsProvider); + if (basicAuth) { + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } return httpClientBuilder.setSSLContext(sslContext); } @@ -248,12 +246,55 @@ public class OpenSearch1xIndexWriter implements IndexWriter { client = new RestHighLevelClient(restClientBuilder); } else { throw new IOException( - "ElasticRestClient initialization Failed!!!\\n\\nPlease Provide the hosts"); + "OpenSearchRestClient initialization Failed!!!\\n\\nPlease Provide the hosts"); } return client; } + private SSLContext createSSLContext() throws GeneralSecurityException, IOException { + if (trustStorePath == null && keyStorePath == null) { + return SSLContexts.createDefault(); + } + + SSLContextBuilder sslBuilder = SSLContexts.custom(); + Optional<KeyStore> trustStore = loadStore(trustStorePath, trustStorePassword, trustStoreType); + Optional<KeyStore> keyStore = loadStore(keyStorePath, keyStorePassword, keyStoreType); + + if (trustStore.isPresent()) { + sslBuilder.loadTrustMaterial(trustStore.get(), null); + } else { + LOG.warn("You haven't set up a trust store. We're effectively turning off " + + " tls. This is 'Not a good idea'(tm). See a getting started guide: " + + "https://opensearch.org/blog/connecting-java-high-level-rest-client-with-opensearch-over-https/ "+ + " or in more depth: https://opensearch.org/docs/latest/security/configuration/tls/"); + sslBuilder.loadTrustMaterial(null, new TrustSelfSignedStrategy()); + } + + if (keyStore.isPresent()) { + //assuming the keystore and the key have the same password + sslBuilder.loadKeyMaterial(keyStore.get(), keyStorePassword.toCharArray()); + } + return sslBuilder.build(); + } + + private Optional<KeyStore> loadStore(String storePath, String storePassword, String storeType) + throws GeneralSecurityException, IOException { + if (StringUtils.isAllBlank(storePath)) { + return Optional.empty(); + } + if (StringUtils.isAllBlank(storePassword)) { + throw new IllegalArgumentException("must include a password for store: " + storePath); + } + + KeyStore store = KeyStore.getInstance(storeType); + try (InputStream is = Files.newInputStream( + Paths.get(storePath))) { + store.load(is, storePassword.toCharArray()); + } + return Optional.of(store); + } + /** * Generates a default BulkProcessor.Listener * @@ -268,7 +309,7 @@ public class OpenSearch1xIndexWriter implements IndexWriter { @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Elasticsearch indexing failed:", failure); + LOG.error("Opensearch indexing failed:", failure); } @Override @@ -358,10 +399,10 @@ public class OpenSearch1xIndexWriter implements IndexWriter { new AbstractMap.SimpleEntry<>("Comma-separated list of hostnames", this.hosts == null ? "" : String.join(",", hosts))); properties.put(OpenSearch1xConstants.PORT, - new AbstractMap.SimpleEntry<>("The port to connect to elastic server.", + new AbstractMap.SimpleEntry<>("The port to connect to opensearch server.", this.port)); properties.put(OpenSearch1xConstants.SCHEME, new AbstractMap.SimpleEntry<>( - "The scheme (http or https) to connect to elastic server.", + "The scheme (http or https) to connect to opensearch server.", this.scheme)); properties.put(OpenSearch1xConstants.INDEX, new AbstractMap.SimpleEntry<>("Default index to send documents to.", @@ -371,16 +412,26 @@ public class OpenSearch1xIndexWriter implements IndexWriter { this.user)); properties.put(OpenSearch1xConstants.PASSWORD, new AbstractMap.SimpleEntry<>("Password for auth credentials", - StringUtil.mask(this.password))); + StringUtil.mask(getOrEmptyString(this.password)))); + properties.put(OpenSearch1xConstants.TRUST_STORE_PATH, new AbstractMap.SimpleEntry<>("Trust store path", this.trustStorePath)); properties.put(OpenSearch1xConstants.TRUST_STORE_PASSWORD, new AbstractMap.SimpleEntry<>("Password for trust store", - StringUtil.mask(this.trustStorePassword))); + StringUtil.mask(getOrEmptyString(this.trustStorePassword)))); properties.put(OpenSearch1xConstants.TRUST_STORE_TYPE, new AbstractMap.SimpleEntry<>("Trust store type (default=JKS)", this.trustStoreType)); + properties.put(OpenSearch1xConstants.KEY_STORE_PATH, + new AbstractMap.SimpleEntry<>("Key store path", this.keyStorePath)); + properties.put(OpenSearch1xConstants.KEY_STORE_PASSWORD, + new AbstractMap.SimpleEntry<>("Password for key and key store", + StringUtil.mask(getOrEmptyString(this.keyStorePassword)))); + properties.put(OpenSearch1xConstants.KEY_STORE_TYPE, + new AbstractMap.SimpleEntry<>("Key store type (default=JKS)", + this.keyStoreType)); + properties.put(OpenSearch1xConstants.MAX_BULK_DOCS, new AbstractMap.SimpleEntry<>( "Maximum size of the bulk in number of documents.", @@ -404,6 +455,10 @@ public class OpenSearch1xIndexWriter implements IndexWriter { return properties; } + private static String getOrEmptyString(String nullable) { + return nullable != null ? nullable : StringUtils.EMPTY; + } + @Override public void setConf(Configuration conf) { config = conf;