This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c6f9f42a4 create proxied file system using multiple tokens (#3744)
c6f9f42a4 is described below
commit c6f9f42a4a81fb81a5b5701ae57decba541d0516
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 17 16:32:45 2023 -0700
create proxied file system using multiple tokens (#3744)
---
.../gobblin/util/ProxiedFileSystemCache.java | 23 +++++++++++++++++++---
.../gobblin/util/ProxiedFileSystemUtils.java | 23 ++++++++++++----------
.../java/org/apache/gobblin/util/WriterUtils.java | 7 ++++---
3 files changed, 37 insertions(+), 16 deletions(-)
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
index 0f4e94ac2..466dd6dc4 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemCache.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.util;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -205,7 +207,9 @@ public class ProxiedFileSystemCache {
/**
* Cached version of {@link
ProxiedFileSystemUtils#createProxiedFileSystemUsingToken(String, Token, URI,
Configuration)}.
+ * Deprecated in favor of {@link #getProxiedFileSystemUsingTokens}
*/
+ @Deprecated
@Builder(builderClassName = "ProxiedFileSystemFromToken", builderMethodName
= "fromToken")
private static FileSystem getProxiedFileSystemUsingToken(@NonNull String
userNameToProxyAs, Token<?> userNameToken,
URI fsURI, Configuration conf, FileSystem referenceFS) throws
IOException, ExecutionException {
@@ -215,7 +219,20 @@ public class ProxiedFileSystemCache {
Configuration actualConfiguration = resolveConfiguration(conf,
referenceFS);
return USER_NAME_TO_FILESYSTEM_CACHE.get(getFileSystemKey(actualURI,
userNameToProxyAs, referenceFS),
- new CreateProxiedFileSystemFromToken(userNameToProxyAs, userNameToken,
actualURI, actualConfiguration,
+ new CreateProxiedFileSystemFromToken(userNameToProxyAs,
Collections.singletonList(userNameToken), actualURI, actualConfiguration,
+ referenceFS));
+ }
+
+ @Builder(builderClassName = "ProxiedFileSystemFromTokens", builderMethodName
= "fromTokens")
+ private static FileSystem getProxiedFileSystemUsingTokens(@NonNull String
userNameToProxyAs, List<Token<?>> userNameTokens,
+ URI fsURI, Configuration conf, FileSystem referenceFS) throws
IOException, ExecutionException {
+ Preconditions.checkNotNull(userNameToProxyAs, "Must provide a user name to
proxy as.");
+ Preconditions.checkNotNull(userNameTokens, "Must provide token for user to
proxy.");
+ URI actualURI = resolveUri(fsURI, conf, referenceFS);
+ Configuration actualConfiguration = resolveConfiguration(conf,
referenceFS);
+
+ return USER_NAME_TO_FILESYSTEM_CACHE.get(getFileSystemKey(actualURI,
userNameToProxyAs, referenceFS),
+ new CreateProxiedFileSystemFromToken(userNameToProxyAs,
userNameTokens, actualURI, actualConfiguration,
referenceFS));
}
@@ -272,7 +289,7 @@ public class ProxiedFileSystemCache {
@NonNull
private final String userNameToProxyAs;
@NonNull
- private final Token<?> userNameToken;
+ private final List<Token<?>> userNameTokens;
@NonNull
private final URI uri;
@NonNull
@@ -282,7 +299,7 @@ public class ProxiedFileSystemCache {
@Override
public FileSystem call() throws Exception {
FileSystem fs =
ProxiedFileSystemUtils.createProxiedFileSystemUsingToken(this.userNameToProxyAs,
- this.userNameToken, this.uri, this.configuration);
+ this.userNameTokens, this.uri, this.configuration);
if (this.referenceFS != null) {
return decorateFilesystemFromReferenceFS(fs, this.referenceFS);
}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
index 3626199b3..6820d651e 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/ProxiedFileSystemUtils.java
@@ -20,9 +20,10 @@ package org.apache.gobblin.util;
import java.io.IOException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
@@ -86,10 +87,10 @@ public class ProxiedFileSystemUtils {
case TOKEN:
Preconditions.checkArgument(properties.containsKey(AUTH_TOKEN_PATH));
Path tokenPath = new Path(properties.getProperty(AUTH_TOKEN_PATH));
- Optional<Token<?>> proxyToken = getTokenFromSeqFile(userNameToProxyAs,
tokenPath);
- if (proxyToken.isPresent()) {
+ List<Token<?>> proxyTokens = getTokenFromSeqFile(userNameToProxyAs,
tokenPath);
+ if (proxyTokens.size() > 0) {
try {
- return createProxiedFileSystemUsingToken(userNameToProxyAs,
proxyToken.get(), fsURI, conf);
+ return createProxiedFileSystemUsingToken(userNameToProxyAs,
proxyTokens, fsURI, conf);
} catch (InterruptedException e) {
throw new IOException("Failed to proxy as user " +
userNameToProxyAs, e);
}
@@ -168,17 +169,19 @@ public class ProxiedFileSystemUtils {
* method to create a {@link FileSystem}.
*
* @param userNameToProxyAs The name of the user the super user should proxy
as
- * @param userNameToken The {@link Token} to add to the proxied user's
{@link UserGroupInformation}.
+ * @param userNameTokens List of {@link Token}s to add to the proxied user's
{@link UserGroupInformation}.
* @param fsURI The {@link URI} for the {@link FileSystem} that should be
created
* @param conf The {@link Configuration} for the {@link FileSystem} that
should be created
*
* @return a {@link FileSystem} that can execute commands on behalf of the
specified userNameToProxyAs
*/
static FileSystem createProxiedFileSystemUsingToken(@NonNull String
userNameToProxyAs,
- @NonNull Token<?> userNameToken, URI fsURI, Configuration conf) throws
IOException, InterruptedException {
+ @NonNull List<Token<?>> userNameTokens, URI fsURI, Configuration conf)
throws IOException, InterruptedException {
UserGroupInformation ugi =
UserGroupInformation.createProxyUser(userNameToProxyAs,
UserGroupInformation.getLoginUser());
- ugi.addToken(userNameToken);
+ for (Token<?> userNameToken : userNameTokens) {
+ ugi.addToken(userNameToken);
+ }
return ugi.doAs(new ProxiedFileSystem(fsURI, conf));
}
@@ -205,7 +208,7 @@ public class ProxiedFileSystemUtils {
*
* @return A {@link Token} for the given user name
*/
- public static Optional<Token<?>> getTokenFromSeqFile(String userNameKey,
Path tokenFilePath) throws IOException {
+ public static List<Token<?>> getTokenFromSeqFile(String userNameKey, Path
tokenFilePath) throws IOException {
log.info("Reading tokens from sequence file " + tokenFilePath);
try (Closer closer = Closer.create()) {
@@ -218,12 +221,12 @@ public class ProxiedFileSystemUtils {
while (tokenReader.next(key, value)) {
log.debug("Found token for user: " + key);
if (key.toString().equals(userNameKey)) {
- return Optional.<Token<?>> of(value);
+ return Collections.singletonList(value);
}
}
}
log.warn("Did not find any tokens for user " + userNameKey);
- return Optional.absent();
+ return Collections.emptyList();
}
private static UserGroupInformation loginAndProxyAsUser(@NonNull String
userNameToProxyAs,
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
index fe2c78f39..c1fa45d8d 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.util;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -364,12 +365,12 @@ public class WriterUtils {
throws IOException {
try {
String user = state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_NAME);
- Optional<Token<?>> token = ProxiedFileSystemUtils
+ List<Token<?>> tokens = ProxiedFileSystemUtils
.getTokenFromSeqFile(user, new
Path(state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_TOKEN_FILE)));
- if (!token.isPresent()) {
+ if (tokens.isEmpty()) {
throw new IOException("No token found for user " + user);
}
- return ProxiedFileSystemCache.fromToken().userNameToken(token.get())
+ return ProxiedFileSystemCache.fromTokens().userNameTokens(tokens)
.userNameToProxyAs(state.getProp(ConfigurationKeys.FS_PROXY_AS_USER_NAME)).fsURI(uri)
.conf(HadoopUtils.newConfiguration()).build();
} catch (ExecutionException e) {