Repository: beam
Updated Branches:
  refs/heads/master db4b0939a -> 04f5bc6f8


[BEAM-1274] Add SSL mutual authentication support


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f48bb4be
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f48bb4be
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f48bb4be

Branch: refs/heads/master
Commit: f48bb4be1d3bb23f3cc978c4c25cf43842639296
Parents: aadbe36
Author: Jean-Baptiste Onofré <jbono...@apache.org>
Authored: Mon Jul 24 17:53:15 2017 +0200
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Mon Aug 7 07:28:14 2017 +0200

----------------------------------------------------------------------
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 69 +++++++++++++++++++-
 1 file changed, 67 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f48bb4be/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 5046888..2cd3bcd 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -25,10 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.Serializable;
-import java.net.MalformedURLException;
 import java.net.URL;
+import java.security.KeyStore;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -39,6 +43,8 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import javax.net.ssl.SSLContext;
+
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -60,7 +66,9 @@ import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
 import org.apache.http.message.BasicHeader;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
 import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.ssl.SSLContexts;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
@@ -155,6 +163,12 @@ public class ElasticsearchIO {
     @Nullable
     abstract String getPassword();
 
+    @Nullable
+    abstract String getKeystorePath();
+
+    @Nullable
+    abstract String getKeystorePassword();
+
     abstract String getIndex();
 
     abstract String getType();
@@ -169,6 +183,10 @@ public class ElasticsearchIO {
 
       abstract Builder setPassword(String password);
 
+      abstract Builder setKeystorePath(String keystorePath);
+
+      abstract Builder setKeystorePassword(String password);
+
       abstract Builder setIndex(String index);
 
       abstract Builder setType(String type);
@@ -239,6 +257,32 @@ public class ElasticsearchIO {
       return builder().setPassword(password).build();
     }
 
+    /**
+     * If Elasticsearch uses SSL with mutual authentication (via shield),
+     * provide the keystore containing the client key.
+     *
+     * @param keystorePath the location of the keystore containing the client 
key.
+     * @return the {@link ConnectionConfiguration} object with keystore path 
set.
+     */
+    public ConnectionConfiguration withKeystorePath(String keystorePath) {
+      checkArgument(keystorePath != null, "ConnectionConfiguration.create()"
+          + ".withKeystorePath(keystorePath) called with null keystorePath");
+      return builder().setKeystorePath(keystorePath).build();
+    }
+
+    /**
+     * If Elasticsearch uses SSL with mutual authentication (via shield),
+     * provide the password to open the client keystore.
+     *
+     * @param keystorePassword the password of the client keystore.
+     * @return the {@link ConnectionConfiguration} object with keystore 
password set.
+     */
+    public ConnectionConfiguration withKeystorePassword(String 
keystorePassword) {
+        checkArgument(keystorePassword != null, 
"ConnectionConfiguration.create()"
+            + ".withKeystorePassword(keystorePassword) called with null 
keystorePassword");
+        return builder().setKeystorePassword(keystorePassword).build();
+    }
+
     private void populateDisplayData(DisplayData.Builder builder) {
       builder.add(DisplayData.item("address", getAddresses().toString()));
       builder.add(DisplayData.item("index", getIndex()));
@@ -246,7 +290,7 @@ public class ElasticsearchIO {
       builder.addIfNotNull(DisplayData.item("username", getUsername()));
     }
 
-    RestClient createClient() throws MalformedURLException {
+    RestClient createClient() throws IOException {
       HttpHost[] hosts = new HttpHost[getAddresses().size()];
       int i = 0;
       for (String address : getAddresses()) {
@@ -267,6 +311,27 @@ public class ElasticsearchIO {
               }
             });
       }
+      if (getKeystorePath() != null) {
+        try {
+          KeyStore keyStore = KeyStore.getInstance("jks");
+          try (InputStream is = new FileInputStream(new 
File(getKeystorePath()))) {
+            keyStore.load(is, getKeystorePassword().toCharArray());
+          }
+          final SSLContext sslContext = SSLContexts.custom()
+              .loadTrustMaterial(keyStore, null).build();
+          final SSLIOSessionStrategy sessionStrategy = new 
SSLIOSessionStrategy(sslContext);
+          restClientBuilder.setHttpClientConfigCallback(
+              new RestClientBuilder.HttpClientConfigCallback() {
+            @Override
+            public HttpAsyncClientBuilder customizeHttpClient(
+                HttpAsyncClientBuilder httpClientBuilder) {
+              return 
httpClientBuilder.setSSLContext(sslContext).setSSLStrategy(sessionStrategy);
+            }
+          });
+        } catch (Exception e) {
+          throw new IOException("Can't load the client certificate from the 
keystore", e);
+        }
+      }
       return restClientBuilder.build();
     }
   }

Reply via email to