Repository: beam Updated Branches: refs/heads/master db4b0939a -> 04f5bc6f8
[BEAM-1274] Add SSL mutual authentication support Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f48bb4be Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f48bb4be Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f48bb4be Branch: refs/heads/master Commit: f48bb4be1d3bb23f3cc978c4c25cf43842639296 Parents: aadbe36 Author: Jean-Baptiste Onofré <[email protected]> Authored: Mon Jul 24 17:53:15 2017 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Mon Aug 7 07:28:14 2017 +0200 ---------------------------------------------------------------------- .../sdk/io/elasticsearch/ElasticsearchIO.java | 69 +++++++++++++++++++- 1 file changed, 67 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f48bb4be/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 5046888..2cd3bcd 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -25,10 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; + +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.Serializable; -import java.net.MalformedURLException; import java.net.URL; +import java.security.KeyStore; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -39,6 +43,8 @@ import java.util.ListIterator; import java.util.Map; import java.util.NoSuchElementException; import javax.annotation.Nullable; +import javax.net.ssl.SSLContext; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -60,7 +66,9 @@ import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.message.BasicHeader; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.ssl.SSLContexts; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; @@ -155,6 +163,12 @@ public class ElasticsearchIO { @Nullable abstract String getPassword(); + @Nullable + abstract String getKeystorePath(); + + @Nullable + abstract String getKeystorePassword(); + abstract String getIndex(); abstract String getType(); @@ -169,6 +183,10 @@ public class ElasticsearchIO { abstract Builder setPassword(String password); + abstract Builder setKeystorePath(String keystorePath); + + abstract Builder setKeystorePassword(String password); + abstract Builder setIndex(String index); abstract Builder setType(String type); @@ -239,6 +257,32 @@ public class ElasticsearchIO { return builder().setPassword(password).build(); } + /** + * If Elasticsearch uses SSL with mutual authentication (via shield), + * provide the keystore containing the client key. + * + * @param keystorePath the location of the keystore containing the client key. + * @return the {@link ConnectionConfiguration} object with keystore path set. + */ + public ConnectionConfiguration withKeystorePath(String keystorePath) { + checkArgument(keystorePath != null, "ConnectionConfiguration.create()" + + ".withKeystorePath(keystorePath) called with null keystorePath"); + return builder().setKeystorePath(keystorePath).build(); + } + + /** + * If Elasticsearch uses SSL with mutual authentication (via shield), + * provide the password to open the client keystore. + * + * @param keystorePassword the password of the client keystore. + * @return the {@link ConnectionConfiguration} object with keystore password set. + */ + public ConnectionConfiguration withKeystorePassword(String keystorePassword) { + checkArgument(keystorePassword != null, "ConnectionConfiguration.create()" + + ".withKeystorePassword(keystorePassword) called with null keystorePassword"); + return builder().setKeystorePassword(keystorePassword).build(); + } + private void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("address", getAddresses().toString())); builder.add(DisplayData.item("index", getIndex())); @@ -246,7 +290,7 @@ public class ElasticsearchIO { builder.addIfNotNull(DisplayData.item("username", getUsername())); } - RestClient createClient() throws MalformedURLException { + RestClient createClient() throws IOException { HttpHost[] hosts = new HttpHost[getAddresses().size()]; int i = 0; for (String address : getAddresses()) { @@ -267,6 +311,27 @@ public class ElasticsearchIO { } }); } + if (getKeystorePath() != null) { + try { + KeyStore keyStore = KeyStore.getInstance("jks"); + try (InputStream is = new FileInputStream(new File(getKeystorePath()))) { + keyStore.load(is, getKeystorePassword().toCharArray()); + } + final SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(keyStore, null).build(); + final SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(sslContext); + restClientBuilder.setHttpClientConfigCallback( + new RestClientBuilder.HttpClientConfigCallback() { + @Override + public HttpAsyncClientBuilder customizeHttpClient( + HttpAsyncClientBuilder httpClientBuilder) { + return httpClientBuilder.setSSLContext(sslContext).setSSLStrategy(sessionStrategy); + } + }); + } catch (Exception e) { + throw new IOException("Can't load the client certificate from the keystore", e); + } + } return restClientBuilder.build(); } }
