This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b9bfb8e752 Support ADLS authentication with AAD + proxy (#8654)
b9bfb8e752 is described below

commit b9bfb8e752f61079f42c2c857cacbd673eadf5d1
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Mon May 9 23:49:06 2022 -0700

    Support ADLS authentication with AAD + proxy (#8654)
    
    Currently, our ADLSGen2PinotFS implementation supports authentication
    using Azure Active Directory. This PR adds support to use a proxy
    to connect to the ADLS.
    
    Co-authored-by: Vivek Iyer Vaidyanathan <[email protected]>
---
 .../pinot/plugin/filesystem/ADLSGen2PinotFS.java   | 83 ++++++++++++++++++----
 1 file changed, 68 insertions(+), 15 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index 06fb122042..ff53cb35d5 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.plugin.filesystem;
 
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
 import com.azure.core.http.rest.PagedIterable;
 import com.azure.core.util.Context;
 import com.azure.identity.ClientSecretCredential;
@@ -39,6 +41,7 @@ import com.azure.storage.file.datalake.models.PathHttpHeaders;
 import com.azure.storage.file.datalake.models.PathItem;
 import com.azure.storage.file.datalake.models.PathProperties;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -46,6 +49,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
@@ -70,6 +74,12 @@ import org.slf4j.LoggerFactory;
 public class ADLSGen2PinotFS extends BasePinotFS {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ADLSGen2PinotFS.class);
 
+  private enum AuthenticationType {
+    ACCESS_KEY,
+    AZURE_AD,
+    AZURE_AD_WITH_PROXY
+  }
+  private static final String AUTHENTICATION_TYPE = "authenticationType";
   private static final String ACCOUNT_NAME = "accountName";
   private static final String ACCESS_KEY = "accessKey";
   private static final String FILE_SYSTEM_NAME = "fileSystemName";
@@ -77,6 +87,10 @@ public class ADLSGen2PinotFS extends BasePinotFS {
   private static final String CLIENT_ID = "clientId";
   private static final String CLIENT_SECRET = "clientSecret";
   private static final String TENANT_ID = "tenantId";
+  private static final String PROXY_HOST = "proxyHost";
+  private static final String PROXY_PORT = "proxyPort";
+  private static final String PROXY_USERNAME = "proxyUsername";
+  private static final String PROXY_PASSWORD = "proxyPassword";
 
   private static final String HTTPS_URL_PREFIX = "https://";;
 
@@ -115,11 +129,17 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     String accountName = config.getProperty(ACCOUNT_NAME);
 
     // TODO: consider to add the encryption of the following config
+    String authTypeStr = config.getProperty(AUTHENTICATION_TYPE, 
AuthenticationType.ACCESS_KEY.name());
+    AuthenticationType authType = 
AuthenticationType.valueOf(authTypeStr.toUpperCase());
     String accessKey = config.getProperty(ACCESS_KEY);
     String fileSystemName = config.getProperty(FILE_SYSTEM_NAME);
     String clientId = config.getProperty(CLIENT_ID);
     String clientSecret = config.getProperty(CLIENT_SECRET);
     String tenantId = config.getProperty(TENANT_ID);
+    String proxyHost = config.getProperty(PROXY_HOST);
+    String proxyUsername = config.getProperty(PROXY_USERNAME);
+    String proxyPassword = config.getProperty(PROXY_PASSWORD);
+    Integer proxyPort = Integer.parseInt(config.getProperty(PROXY_PORT));
 
     String dfsServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + 
AZURE_STORAGE_DNS_SUFFIX;
     String blobServiceEndpointUrl = HTTPS_URL_PREFIX + accountName + 
AZURE_BLOB_DNS_SUFFIX;
@@ -128,21 +148,54 @@ public class ADLSGen2PinotFS extends BasePinotFS {
         new DataLakeServiceClientBuilder().endpoint(dfsServiceEndpointUrl);
     BlobServiceClientBuilder blobServiceClientBuilder = new 
BlobServiceClientBuilder().endpoint(blobServiceEndpointUrl);
 
-    if (accountName != null && accessKey != null) {
-      LOGGER.info("Authenticating using the access key to the account.");
-      StorageSharedKeyCredential sharedKeyCredential = new 
StorageSharedKeyCredential(accountName, accessKey);
-      dataLakeServiceClientBuilder.credential(sharedKeyCredential);
-      blobServiceClientBuilder.credential(sharedKeyCredential);
-    } else if (clientId != null && clientSecret != null && tenantId != null) {
-      LOGGER.info("Authenticating using Azure Active Directory");
-      ClientSecretCredential clientSecretCredential =
-          new 
ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId).build();
-      dataLakeServiceClientBuilder.credential(clientSecretCredential);
-      blobServiceClientBuilder.credential(clientSecretCredential);
-    } else {
-      // Error out as at least one mode of auth info needed
-      throw new IllegalArgumentException(
-          "Expecting either (accountName, accessKey) or (clientId, 
clientSecret, tenantId)");
+    switch (authType) {
+      case ACCESS_KEY: {
+        LOGGER.info("Authenticating using the access key to the account.");
+        Preconditions.checkNotNull(accountName, "Account Name cannot be null");
+        Preconditions.checkNotNull(accessKey, "Access Key cannot be null");
+
+        StorageSharedKeyCredential sharedKeyCredential = new 
StorageSharedKeyCredential(accountName, accessKey);
+        dataLakeServiceClientBuilder.credential(sharedKeyCredential);
+        blobServiceClientBuilder.credential(sharedKeyCredential);
+        break;
+      }
+      case AZURE_AD: {
+        LOGGER.info("Authenticating using Azure Active Directory");
+        Preconditions.checkNotNull(clientId, "Client ID cannot be null");
+        Preconditions.checkNotNull(clientSecret, "ClientSecret cannot be 
null");
+        Preconditions.checkNotNull(tenantId, "TenantId cannot be null");
+
+        ClientSecretCredential clientSecretCredential =
+            new 
ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId)
+                .build();
+        dataLakeServiceClientBuilder.credential(clientSecretCredential);
+        blobServiceClientBuilder.credential(clientSecretCredential);
+        break;
+      }
+      case AZURE_AD_WITH_PROXY: {
+        LOGGER.info("Authenticating using Azure Active Directory with proxy");
+        Preconditions.checkNotNull(clientId, "Client Id cannot be null");
+        Preconditions.checkNotNull(clientSecret, "ClientSecret cannot be 
null");
+        Preconditions.checkNotNull(tenantId, "Tenant Id cannot be null");
+        Preconditions.checkNotNull(proxyHost, "Proxy Host cannot be null");
+        Preconditions.checkNotNull(proxyPort, "Proxy Port cannot be null");
+        Preconditions.checkNotNull(proxyUsername, "Proxy Username cannot be 
null");
+        Preconditions.checkNotNull(proxyPassword, "Proxy Password cannot be 
null");
+
+        NettyAsyncHttpClientBuilder builder = new 
NettyAsyncHttpClientBuilder();
+        builder.proxy(
+            new ProxyOptions(ProxyOptions.Type.HTTP, new 
InetSocketAddress(proxyHost, proxyPort)).setCredentials(
+                proxyUsername, proxyPassword));
+        ClientSecretCredentialBuilder clientSecretCredentialBuilder =
+            new 
ClientSecretCredentialBuilder().clientId(clientId).clientSecret(clientSecret).tenantId(tenantId);
+        clientSecretCredentialBuilder.httpClient(builder.build());
+
+        
dataLakeServiceClientBuilder.credential(clientSecretCredentialBuilder.build());
+        
blobServiceClientBuilder.credential(clientSecretCredentialBuilder.build());
+        break;
+      }
+      default:
+        throw new IllegalStateException("Expecting valid authType. One of 
(ACCESS_KEY, AZURE_AD, AZURE_AD_WITH_PROXY");
     }
 
     _blobServiceClient = blobServiceClientBuilder.buildClient();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to