This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c6f9f42a4 create proxied file system using multiple tokens (#3744)
c6f9f42a4 is described below

commit c6f9f42a4a81fb81a5b5701ae57decba541d0516
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 17 16:32:45 2023 -0700

    create proxied file system using multiple tokens (#3744)
---
 .../gobblin/util/ProxiedFileSystemCache.java       | 23 +++++++++++++++++++---
 .../gobblin/util/ProxiedFileSystemUtils.java       | 23 ++++++++++++----------
 .../java/org/apache/gobblin/util/WriterUtils.java  |  7 ++++---
 3 files changed, 37 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
index 0f4e94ac2..466dd6dc4 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.util;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -205,7 +207,9 @@ public class ProxiedFileSystemCache {
 
   /**
    * Cached version of {@link 
ProxiedFileSystemUtils#createProxiedFileSystemUsingToken(String, Token, URI, 
Configuration)}.
+   * Deprecated in favor of {@link #getProxiedFileSystemUsingTokens}
    */
+  @Deprecated
   @Builder(builderClassName = "ProxiedFileSystemFromToken", builderMethodName 
= "fromToken")
   private static FileSystem getProxiedFileSystemUsingToken(@NonNull String 
userNameToProxyAs, Token<?> userNameToken,
       URI fsURI, Configuration conf, FileSystem referenceFS) throws 
IOException, ExecutionException {
@@ -215,7 +219,20 @@ public class ProxiedFileSystemCache {
     Configuration actualConfiguration = resolveConfiguration(conf, 
referenceFS);
 
     return USER_NAME_TO_FILESYSTEM_CACHE.get(getFileSystemKey(actualURI, 
userNameToProxyAs, referenceFS),
-        new CreateProxiedFileSystemFromToken(userNameToProxyAs, userNameToken, 
actualURI, actualConfiguration,
+        new CreateProxiedFileSystemFromToken(userNameToProxyAs, 
Collections.singletonList(userNameToken), actualURI, actualConfiguration,
+            referenceFS));
+  }
+
+  @Builder(builderClassName = "ProxiedFileSystemFromTokens", builderMethodName 
= "fromTokens")
+  private static FileSystem getProxiedFileSystemUsingTokens(@NonNull String 
userNameToProxyAs, List<Token<?>> userNameTokens,
+      URI fsURI, Configuration conf, FileSystem referenceFS) throws 
IOException, ExecutionException {
+    Preconditions.checkNotNull(userNameToProxyAs, "Must provide a user name to 
proxy as.");
+    Preconditions.checkNotNull(userNameTokens, "Must provide token for user to 
proxy.");
+    URI actualURI = resolveUri(fsURI, conf, referenceFS);
+    Configuration actualConfiguration = resolveConfiguration(conf, 
referenceFS);
+
+    return USER_NAME_TO_FILESYSTEM_CACHE.get(getFileSystemKey(actualURI, 
userNameToProxyAs, referenceFS),
+        new CreateProxiedFileSystemFromToken(userNameToProxyAs, 
userNameTokens, actualURI, actualConfiguration,
             referenceFS));
   }
 
@@ -272,7 +289,7 @@ public class ProxiedFileSystemCache {
     @NonNull
     private final String userNameToProxyAs;
     @NonNull
-    private final Token<?> userNameToken;
+    private final List<Token<?>> userNameTokens;
     @NonNull
     private final URI uri;
     @NonNull
@@ -282,7 +299,7 @@ public class ProxiedFileSystemCache {
     @Override
     public FileSystem call() throws Exception {
       FileSystem fs = 
ProxiedFileSystemUtils.createProxiedFileSystemUsingToken(this.userNameToProxyAs,
-          this.userNameToken, this.uri, this.configuration);
+          this.userNameTokens, this.uri, this.configuration);
       if (this.referenceFS != null) {
         return decorateFilesystemFromReferenceFS(fs, this.referenceFS);
       }
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
index 3626199b3..6820d651e 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
@@ -20,9 +20,10 @@ package org.apache.gobblin.util;
 import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closer;
 
@@ -86,10 +87,10 @@ public class ProxiedFileSystemUtils {
       case TOKEN:
         Preconditions.checkArgument(properties.containsKey(AUTH_TOKEN_PATH));
         Path tokenPath = new Path(properties.getProperty(AUTH_TOKEN_PATH));
-        Optional<Token<?>> proxyToken = getTokenFromSeqFile(userNameToProxyAs, 
tokenPath);
-        if (proxyToken.isPresent()) {
+        List<Token<?>> proxyTokens = getTokenFromSeqFile(userNameToProxyAs, 
tokenPath);
+        if (proxyTokens.size() > 0) {
           try {
-            return createProxiedFileSystemUsingToken(userNameToProxyAs, 
proxyToken.get(), fsURI, conf);
+            return createProxiedFileSystemUsingToken(userNameToProxyAs, 
proxyTokens, fsURI, conf);
           } catch (InterruptedException e) {
             throw new IOException("Failed to proxy as user " + 
userNameToProxyAs, e);
           }
@@ -168,17 +169,19 @@ public class ProxiedFileSystemUtils {
    * method to create a {@link FileSystem}.
    *
    * @param userNameToProxyAs The name of the user the super user should proxy 
as
-   * @param userNameToken The {@link Token} to add to the proxied user's 
{@link UserGroupInformation}.
+   * @param userNameTokens List of {@link Token}s to add to the proxied user's 
{@link UserGroupInformation}.
    * @param fsURI The {@link URI} for the {@link FileSystem} that should be 
created
    * @param conf The {@link Configuration} for the {@link FileSystem} that 
should be created
    *
    * @return a {@link FileSystem} that can execute commands on behalf of the 
specified userNameToProxyAs
    */
   static FileSystem createProxiedFileSystemUsingToken(@NonNull String 
userNameToProxyAs,
-      @NonNull Token<?> userNameToken, URI fsURI, Configuration conf) throws 
IOException, InterruptedException {
+      @NonNull List<Token<?>> userNameTokens, URI fsURI, Configuration conf) 
throws IOException, InterruptedException {
     UserGroupInformation ugi =
         UserGroupInformation.createProxyUser(userNameToProxyAs, 
UserGroupInformation.getLoginUser());
-    ugi.addToken(userNameToken);
+    for (Token<?> userNameToken : userNameTokens) {
+      ugi.addToken(userNameToken);
+    }
     return ugi.doAs(new ProxiedFileSystem(fsURI, conf));
   }
 
@@ -205,7 +208,7 @@ public class ProxiedFileSystemUtils {
    *
    * @return A {@link Token} for the given user name
    */
-  public static Optional<Token<?>> getTokenFromSeqFile(String userNameKey, 
Path tokenFilePath) throws IOException {
+  public static List<Token<?>> getTokenFromSeqFile(String userNameKey, Path 
tokenFilePath) throws IOException {
     log.info("Reading tokens from sequence file " + tokenFilePath);
 
     try (Closer closer = Closer.create()) {
@@ -218,12 +221,12 @@ public class ProxiedFileSystemUtils {
       while (tokenReader.next(key, value)) {
         log.debug("Found token for user: " + key);
         if (key.toString().equals(userNameKey)) {
-          return Optional.<Token<?>> of(value);
+          return Collections.singletonList(value);
         }
       }
     }
     log.warn("Did not find any tokens for user " + userNameKey);
-    return Optional.absent();
+    return Collections.emptyList();
   }
 
   private static UserGroupInformation loginAndProxyAsUser(@NonNull String 
userNameToProxyAs,
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
index fe2c78f39..c1fa45d8d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.util;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
@@ -364,12 +365,12 @@ public class WriterUtils {
       throws IOException {
     try {
       String user = state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_NAME);
-      Optional<Token<?>> token = ProxiedFileSystemUtils
+      List<Token<?>> tokens = ProxiedFileSystemUtils
           .getTokenFromSeqFile(user, new 
Path(state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_TOKEN_FILE)));
-      if (!token.isPresent()) {
+      if (tokens.isEmpty()) {
         throw new IOException("No token found for user " + user);
       }
-      return ProxiedFileSystemCache.fromToken().userNameToken(token.get())
+      return ProxiedFileSystemCache.fromTokens().userNameTokens(tokens)
           
.userNameToProxyAs(state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_NAME)).fsURI(uri)
           .conf(HadoopUtils.newConfiguration()).build();
     } catch (ExecutionException e) {

Reply via email to