leekeiabstraction commented on code in PR #1245:
URL: https://github.com/apache/fluss/pull/1245#discussion_r2657425765
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java:
##########
@@ -61,40 +87,115 @@ public FileSystem create(URI fsUri, Configuration
flussConfig) throws IOExceptio
org.apache.hadoop.conf.Configuration hadoopConfig =
mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig));
- // set credential provider
- setCredentialProvider(hadoopConfig);
+ final boolean isClient = isClient(flussConfig);
+ final boolean useTokenDelegation;
Review Comment:
Rename from `useTokenDelegation` to `shouldServeToken` or
`shouldDelegateToken` as technically the client side still uses token
delegation if server is configured to perform token delegation.
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/S3FileSystemPlugin.java:
##########
@@ -61,40 +87,115 @@ public FileSystem create(URI fsUri, Configuration
flussConfig) throws IOExceptio
org.apache.hadoop.conf.Configuration hadoopConfig =
mirrorCertainHadoopConfig(getHadoopConfiguration(flussConfig));
- // set credential provider
- setCredentialProvider(hadoopConfig);
+ final boolean isClient = isClient(flussConfig);
+ final boolean useTokenDelegation;
+
+ if (isClient) {
+ // Only relevant on the server side, default to false
+ useTokenDelegation = false;
+ // We do not know if token delegation on the server will be
activated or deactivated.
+ // Hence, we just add the Fluss credential provider for token
delegation
+ // to the provider chain and the file system will figure out a
valid provider
+ // at runtime.
+ setCredentialProviders(
+ hadoopConfig,
+
Collections.singletonList(DynamicTemporaryAWSCredentialsProvider.NAME));
+
S3ADelegationTokenReceiver.updateHadoopConfigAdditionalInfos(hadoopConfig);
+ } else {
+ useTokenDelegation =
+
flussConfig.getBoolean(ConfigOptions.FILE_SYSTEM_S3_ENABLE_TOKEN_DELEGATION);
+ }
+
+ LOG.info("Hadoop configuration: {}", hadoopConfig);
- // create the Hadoop FileSystem
org.apache.hadoop.fs.FileSystem fs = new S3AFileSystem();
fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
- return new S3FileSystem(getScheme(), fs, hadoopConfig);
+
+ final Supplier<S3DelegationTokenProvider> delegationTokenProvider =
+ isClient
+ ? () -> {
+ throw new IllegalStateException(
+ "Unexpected usage of delegation token
provider. Delegation token provider should only be used on the server side.");
+ }
+ : () -> {
+ final S3DelegationTokenProvider.Type
delegationTokenProviderType =
+ useTokenDelegation
+ ?
S3DelegationTokenProvider.Type.STS_SESSION_TOKEN
+ :
S3DelegationTokenProvider.Type.NO_TOKEN;
+ return new S3DelegationTokenProvider(
+ getScheme(), hadoopConfig,
delegationTokenProviderType);
+ };
+
+ return new S3FileSystem(fs, delegationTokenProvider);
}
+ /**
+ * Creates a Hadoop configuration and adds file system-related
configurations contained in the
+ * Fluss configuration to the Hadoop configuration with a uniform prefix
({@link
+ * S3FileSystemPlugin#HADOOP_CONFIG_PREFIX}). For client configurations
({@link
+ * S3FileSystemPlugin#CLIENT_PREFIX}), only whitelisted configuration
options are added.
+ *
+ * @param flussConfig The Fluss configuration.
+ * @return The Hadoop configuration.
+ */
+ @VisibleForTesting
org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration
flussConfig) {
org.apache.hadoop.conf.Configuration conf = new
org.apache.hadoop.conf.Configuration();
+
if (flussConfig == null) {
return conf;
}
- for (String key : flussConfig.keySet()) {
- for (String prefix : FLUSS_CONFIG_PREFIXES) {
- if (key.startsWith(prefix)) {
- String newKey = HADOOP_CONFIG_PREFIX +
key.substring(prefix.length());
+ for (String flussKey : flussConfig.keySet()) {
+ for (String flussPrefix : FLUSS_CONFIG_PREFIXES) {
+ if (flussKey.startsWith(flussPrefix)) {
+ String hadoopConfigKey =
+ HADOOP_CONFIG_PREFIX +
flussKey.substring(flussPrefix.length());
String newValue =
flussConfig.getString(
-
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
- conf.set(newKey, newValue);
+
ConfigBuilder.key(flussKey).stringType().noDefaultValue(),
+ null);
+ conf.set(hadoopConfigKey, newValue);
LOG.debug(
- "Adding Fluss config entry for {} as {} to Hadoop
config", key, newKey);
+ "Adding Fluss config entry for {} as {} to Hadoop
config",
+ flussKey,
+ hadoopConfigKey);
+ }
+
+ String flussKeyClientPrefix = CLIENT_PREFIX + flussPrefix;
+ if (flussKey.startsWith(flussKeyClientPrefix)) {
Review Comment:
Suggest the following to avoid unnecessary alloc and comparison.
```java
else if (flussKey.startsWith(CLIENT_PREFIX + flussPrefix)) {
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -241,6 +247,17 @@ private static Configuration toFlussClientConfig(
}
});
+ // pass through all fluss options from flink config
+ try {
+ PropertiesUtils.extractAndRemovePrefix(flinkConfig.toMap(),
FLUSS_PREFIX)
Review Comment:
Line 243 to 248 already forward table configs with prefix `client` to fluss
config.
This will remove any table specific config if the name clashes. In such
cases, should table specific config take precedence over flink config?
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java:
##########
@@ -35,42 +40,54 @@
/** Security token receiver for S3 filesystem. */
public class S3DelegationTokenReceiver implements SecurityTokenReceiver {
- public static final String PROVIDER_CONFIG_NAME =
"fs.s3a.aws.credentials.provider";
-
private static final Logger LOG =
LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
static volatile Credentials credentials;
- static volatile Map<String, String> additionInfos;
-
- public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration
hadoopConfig) {
- LOG.info("Updating Hadoop configuration");
-
- String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
- if (!providers.contains(DynamicTemporaryAWSCredentialsProvider.NAME)) {
- if (providers.isEmpty()) {
- LOG.debug("Setting provider");
- providers = DynamicTemporaryAWSCredentialsProvider.NAME;
+ static volatile Map<String, String> additionalInfos;
+
+ public static void updateHadoopConfigCredentialProviders(
+ org.apache.hadoop.conf.Configuration hadoopConfig, List<String>
credentialProvider) {
+ LOG.info("Updating credential providers in Hadoop configuration");
+
+ String providers =
hadoopConfig.get(S3ConfigOptions.CREDENTIALS_PROVIDER_CONFIG_NAME, "");
+ List<String> credentialProviderPrependOrder = new
ArrayList<>(credentialProvider);
+ Collections.reverse(credentialProviderPrependOrder);
+
+ for (String credentialProviderName : credentialProviderPrependOrder) {
+ if (!providers.contains(credentialProviderName)) {
Review Comment:
We should use exact match instead of contains.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java:
##########
@@ -241,6 +247,17 @@ private static Configuration toFlussClientConfig(
}
});
+ // pass through all fluss options from flink config
+ try {
+ PropertiesUtils.extractAndRemovePrefix(flinkConfig.toMap(),
FLUSS_PREFIX)
+ .forEach(flussConfig::setString);
Review Comment:
Should we enforce/prepend`client.` prefix here? Technically, all config in
FlinkTableFactory is Fluss client side config. As is, we are leaking config non
client config through if user missed out the `client` part.. I believe that
leads to the best effort handling logic that we have within
`isClient(configuration)`
##########
fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenReceiver.java:
##########
@@ -35,42 +40,54 @@
/** Security token receiver for S3 filesystem. */
public class S3DelegationTokenReceiver implements SecurityTokenReceiver {
- public static final String PROVIDER_CONFIG_NAME =
"fs.s3a.aws.credentials.provider";
-
private static final Logger LOG =
LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
static volatile Credentials credentials;
- static volatile Map<String, String> additionInfos;
-
- public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration
hadoopConfig) {
- LOG.info("Updating Hadoop configuration");
-
- String providers = hadoopConfig.get(PROVIDER_CONFIG_NAME, "");
- if (!providers.contains(DynamicTemporaryAWSCredentialsProvider.NAME)) {
- if (providers.isEmpty()) {
- LOG.debug("Setting provider");
- providers = DynamicTemporaryAWSCredentialsProvider.NAME;
+ static volatile Map<String, String> additionalInfos;
+
+ public static void updateHadoopConfigCredentialProviders(
+ org.apache.hadoop.conf.Configuration hadoopConfig, List<String>
credentialProvider) {
+ LOG.info("Updating credential providers in Hadoop configuration");
+
+ String providers =
hadoopConfig.get(S3ConfigOptions.CREDENTIALS_PROVIDER_CONFIG_NAME, "");
+ List<String> credentialProviderPrependOrder = new
ArrayList<>(credentialProvider);
+ Collections.reverse(credentialProviderPrependOrder);
+
+ for (String credentialProviderName : credentialProviderPrependOrder) {
+ if (!providers.contains(credentialProviderName)) {
Review Comment:
Also, if I'm reading correctly, we currently only add a single credential
provider
`Collections.singletonList(DynamicTemporaryAWSCredentialsProvider.NAME)` from
S3FileSystemPlugin.java:102. Do we need to handle updating and adding a list of
credentialProviders anywhere? If not, it might be worth simplifying the code
and remove the updating/adding of list of credentialProviders
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]