leekeiabstraction commented on code in PR #1245:
URL: https://github.com/apache/fluss/pull/1245#discussion_r2657425765


##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java:
##########
@@ -61,40 +87,115 @@ public FileSystem create(URI fsUri, Configuration 
flussConfig) throws IOExceptio
         org.apache.hadoop.conf.Configuration hadoopConfig =
                 mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig));
 
-        // set credential provider
-        setCredentialProvider(hadoopConfig);
+        final boolean isClient = isClient(flussConfig);
+        final boolean useTokenDelegation;

Review Comment:
   Rename from `useTokenDelegation` to `shouldServeToken` or 
`shouldDelegateToken`  as technically the client side still uses token 
delegation if server is configured to perform token delegation.



##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java:
##########
@@ -61,40 +87,115 @@ public FileSystem create(URI fsUri, Configuration 
flussConfig) throws IOExceptio
         org.apache.hadoop.conf.Configuration hadoopConfig =
                 mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig));
 
-        // set credential provider
-        setCredentialProvider(hadoopConfig);
+        final boolean isClient = isClient(flussConfig);
+        final boolean useTokenDelegation;
+
+        if (isClient) {
+            // Only relevant on the server side, default to false
+            useTokenDelegation = false;
+            // We do not know if token delegation on the server will be 
activated or deactivated.
+            // Hence, we just add the Fluss credential provider for token 
delegation
+            // to the provider chain and the file system will figure out a 
valid provider
+            // at runtime.
+            setCredentialProviders(
+                    hadoopConfig,
+                    
Collections.singletonList(DynamicTemporaryAWSCredentialsProvider.NAME));
+            
S3ADelegationTokenReceiver.updateHadoopConfigAdditionalInfos(hadoopConfig);
+        } else {
+            useTokenDelegation =
+                    
flussConfig.getBoolean(ConfigOptions.FILE_SYSTEM_S3_ENABLE_TOKEN_DELEGATION);
+        }
+
+        LOG.info("Hadoop configuration: {}", hadoopConfig);
 
-        // create the Hadoop FileSystem
         org.apache.hadoop.fs.FileSystem fs = new S3AFileSystem();
         fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
-        return new S3FileSystem(getScheme(), fs, hadoopConfig);
+
+        final Supplier<S3DelegationTokenProvider> delegationTokenProvider =
+                isClient
+                        ? () -> {
+                            throw new IllegalStateException(
+                                    "Unexpected usage of delegation token 
provider. Delegation token provider should only be used on the server side.");
+                        }
+                        : () -> {
+                            final S3DelegationTokenProvider.Type 
delegationTokenProviderType =
+                                    useTokenDelegation
+                                            ? 
S3DelegationTokenProvider.Type.STS_SESSION_TOKEN
+                                            : 
S3DelegationTokenProvider.Type.NO_TOKEN;
+                            return new S3DelegationTokenProvider(
+                                    getScheme(), hadoopConfig, 
delegationTokenProviderType);
+                        };
+
+        return new S3FileSystem(fs, delegationTokenProvider);
     }
 
+    /**
+     * Creates a Hadoop configuration and adds file system-related 
configurations contained in the
+     * Fluss configuration to the Hadoop configuration with a uniform prefix 
({@link
+     * S3FileSystemPlugin#HADOOP_CONFIG_PREFIX}). For client configurations 
({@link
+     * S3FileSystemPlugin#CLIENT_PREFIX}), only whitelisted configuration 
options are added.
+     *
+     * @param flussConfig The Fluss configuration.
+     * @return The Hadoop configuration.
+     */
+    @VisibleForTesting
     org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration 
flussConfig) {
         org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
+
         if (flussConfig == null) {
             return conf;
         }
 
-        for (String key : flussConfig.keySet()) {
-            for (String prefix : FLUSS_CONFIG_PREFIXES) {
-                if (key.startsWith(prefix)) {
-                    String newKey = HADOOP_CONFIG_PREFIX + 
key.substring(prefix.length());
+        for (String flussKey : flussConfig.keySet()) {
+            for (String flussPrefix : FLUSS_CONFIG_PREFIXES) {
+                if (flussKey.startsWith(flussPrefix)) {
+                    String hadoopConfigKey =
+                            HADOOP_CONFIG_PREFIX + 
flussKey.substring(flussPrefix.length());
                     String newValue =
                             flussConfig.getString(
-                                    
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
-                    conf.set(newKey, newValue);
+                                    
ConfigBuilder.key(flussKey).stringType().noDefaultValue(),
+                                    null);
+                    conf.set(hadoopConfigKey, newValue);
 
                     LOG.debug(
-                            "Adding Fluss config entry for {} as {} to Hadoop 
config", key, newKey);
+                            "Adding Fluss config entry for {} as {} to Hadoop 
config",
+                            flussKey,
+                            hadoopConfigKey);
+                }
+
+                String flussKeyClientPrefix = CLIENT_PREFIX + flussPrefix;
+                if (flussKey.startsWith(flussKeyClientPrefix)) {

Review Comment:
   Suggest the following to avoid unnecessary alloc and comparison.
   
   ```java
   else if (flussKey.startsWith(CLIENT_PREFIX + flussPrefix)) {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -241,6 +247,17 @@ private static Configuration toFlussClientConfig(
                     }
                 });
 
+        // pass through all fluss options from flink config
+        try {
+            PropertiesUtils.extractAndRemovePrefix(flinkConfig.toMap(), 
FLUSS_PREFIX)

Review Comment:
   Line 243 to 248 already forward table configs with prefix `client` to fluss 
config. 
   
   This will remove any table specific config if the name clashes. In such 
cases, should table specific config take precedence over flink config?



##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java:
##########
@@ -35,42 +40,54 @@
 /** Security token receiver for S3 filesystem. */
 public class S3DelegationTokenReceiver implements SecurityTokenReceiver {
 
-    public static final String PROVIDER_CONFIG_NAME = 
"fs.s3a.aws.credentials.provider";
-
     private static final Logger LOG = 
LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
 
     static volatile Credentials credentials;
-    static volatile Map<String, String> additionInfos;
-
-    public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration 
hadoopConfig) {
-        LOG.info("Updating Hadoop configuration");
-
-        String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
-        if (!providers.contains(DynamicTemporaryAWSCredentialsProvider.NAME)) {
-            if (providers.isEmpty()) {
-                LOG.debug("Setting provider");
-                providers = DynamicTemporaryAWSCredentialsProvider.NAME;
+    static volatile Map<String, String> additionalInfos;
+
+    public static void updateHadoopConfigCredentialProviders(
+            org.apache.hadoop.conf.Configuration hadoopConfig, List<String> 
credentialProvider) {
+        LOG.info("Updating credential providers in Hadoop configuration");
+
+        String providers = 
hadoopConfig.get(S3ConfigOptions.CREDENTIALS_PROVIDER_CONFIG_NAME, "");
+        List<String> credentialProviderPrependOrder = new 
ArrayList<>(credentialProvider);
+        Collections.reverse(credentialProviderPrependOrder);
+
+        for (String credentialProviderName : credentialProviderPrependOrder) {
+            if (!providers.contains(credentialProviderName)) {

Review Comment:
   We should use exact match instead of contains.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -241,6 +247,17 @@ private static Configuration toFlussClientConfig(
                     }
                 });
 
+        // pass through all fluss options from flink config
+        try {
+            PropertiesUtils.extractAndRemovePrefix(flinkConfig.toMap(), 
FLUSS_PREFIX)
+                    .forEach(flussConfig::setString);

Review Comment:
   Should we enforce/prepend`client.` prefix here? Technically, all config in 
FlinkTableFactory is Fluss client side config. As is, we are leaking config non 
client config through if user missed out the `client` part.. I believe that 
leads to the best effort handling logic that we have within 
`isClient(configuration)`



##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java:
##########
@@ -35,42 +40,54 @@
 /** Security token receiver for S3 filesystem. */
 public class S3DelegationTokenReceiver implements SecurityTokenReceiver {
 
-    public static final String PROVIDER_CONFIG_NAME = 
"fs.s3a.aws.credentials.provider";
-
     private static final Logger LOG = 
LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
 
     static volatile Credentials credentials;
-    static volatile Map<String, String> additionInfos;
-
-    public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration 
hadoopConfig) {
-        LOG.info("Updating Hadoop configuration");
-
-        String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
-        if (!providers.contains(DynamicTemporaryAWSCredentialsProvider.NAME)) {
-            if (providers.isEmpty()) {
-                LOG.debug("Setting provider");
-                providers = DynamicTemporaryAWSCredentialsProvider.NAME;
+    static volatile Map<String, String> additionalInfos;
+
+    public static void updateHadoopConfigCredentialProviders(
+            org.apache.hadoop.conf.Configuration hadoopConfig, List<String> 
credentialProvider) {
+        LOG.info("Updating credential providers in Hadoop configuration");
+
+        String providers = 
hadoopConfig.get(S3ConfigOptions.CREDENTIALS_PROVIDER_CONFIG_NAME, "");
+        List<String> credentialProviderPrependOrder = new 
ArrayList<>(credentialProvider);
+        Collections.reverse(credentialProviderPrependOrder);
+
+        for (String credentialProviderName : credentialProviderPrependOrder) {
+            if (!providers.contains(credentialProviderName)) {

Review Comment:
   Also, if I'm reading correctly, we currently only add a single credential 
provider 
`Collections.singletonList(DynamicTemporaryAWSCredentialsProvider.NAME)` from 
S3FileSystemPlugin.java:102. Do we need to handle updating and adding a list of 
credentialProviders anywhere? If not, it might be worth simplifying the code 
and remove the updating/adding of list of credentialProviders



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to