This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new af4c68ab8c9 [FLINK-31839][filesystems] Fix flink-s3-fs-hadoop and 
flink-s3-fs-presto plugin collision
af4c68ab8c9 is described below

commit af4c68ab8c910ffc4e02ec3c0b07fe727d867303
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Apr 19 18:49:15 2023 +0200

    [FLINK-31839][filesystems] Fix flink-s3-fs-hadoop and flink-s3-fs-presto 
plugin collision
    
    These collided with regards to their name of delegation token service.
---
 .../security/security-delegation-token.md          | 21 ++++++++-----
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |  4 +--
 ...java => AbstractS3DelegationTokenProvider.java} | 10 ++-----
 ...java => AbstractS3DelegationTokenReceiver.java} | 12 +++-----
 .../DynamicTemporaryAWSCredentialsProvider.java    |  2 +-
 ... => AbstractS3DelegationTokenProviderTest.java} | 23 ++++++++++----
 ... => AbstractS3DelegationTokenReceiverTest.java} | 35 ++++++++++++++--------
 ...DynamicTemporaryAWSCredentialsProviderTest.java | 12 ++++++--
 .../token/S3HadoopDelegationTokenProvider.java     | 31 +++++++++++++++++++
 .../token/S3HadoopDelegationTokenReceiver.java     | 31 +++++++++++++++++++
 ...ink.core.security.token.DelegationTokenProvider |  2 +-
 ...ink.core.security.token.DelegationTokenReceiver |  2 +-
 .../token/S3PrestoDelegationTokenProvider.java     | 31 +++++++++++++++++++
 .../token/S3PrestoDelegationTokenReceiver.java     | 31 +++++++++++++++++++
 ...ink.core.security.token.DelegationTokenProvider |  2 +-
 ...ink.core.security.token.DelegationTokenReceiver |  2 +-
 .../token/DefaultDelegationTokenManager.java       | 26 ++++++++++++++--
 .../token/DefaultDelegationTokenManagerTest.java   | 24 +++++++++++++++
 18 files changed, 248 insertions(+), 53 deletions(-)

diff --git a/docs/content/docs/deployment/security/security-delegation-token.md 
b/docs/content/docs/deployment/security/security-delegation-token.md
index 16ce2228783..a7f1876d595 100644
--- a/docs/content/docs/deployment/security/security-delegation-token.md
+++ b/docs/content/docs/deployment/security/security-delegation-token.md
@@ -213,23 +213,30 @@ loaded and then will be overwritten by the loading 
mechanism in Flink.
 
 There are certain limitations to bear in mind when talking about DTs.
 
-Firstly, not all DTs actually expose their renewal period. This is a service 
configuration that is 
+* Not all DTs actually expose their renewal period. This is a service 
configuration that is 
 not generally exposed to clients. For this reason, certain DT providers cannot 
provide a renewal period, 
 thus requiring that the service's configuration is in some way synchronized 
with another service
-that does provide that information.
-
+that does provide that information.  
 The HDFS service, which is generally available when DTs are needed in the 
first place, provides
 this information, so in general it's a good idea for all services using DTs to 
use the same
 configuration as HDFS for the renewal period.
 
-Secondly, Flink is not parsing the user application code, so it doesn't know 
which delegation
+* Flink is not parsing the user application code, so it doesn't know which 
delegation
 tokens will be needed. This means that Flink will try to get as many 
delegation tokens as is possible
 based on the configuration available. That means that if an HBase token 
provider is enabled but the app
 doesn't actually use HBase, a DT will still be generated. The user would have 
to explicitly
 disable the mentioned provider in that case.
 
-Thirdly, it is challenging to create DTs "on demand". Flink 
obtains/distributes tokens upfront
-and re-obtains/re-distributes them periodically.
-
+* It is challenging to create DTs "on demand". Flink obtains/distributes 
tokens upfront
+and re-obtains/re-distributes them periodically.  
 The advantage, though, is that user code does not need to worry about DTs, 
since Flink will handle
 them transparently when the proper configuration is available.
+
+* There are external file system plugins which are authenticating to the same 
service. One good example
+is `s3-hadoop` and `s3-presto`. They both authenticate to S3. They're having 
different service names
+but obtaining tokens for the same service which might cause unintended 
consequences. Since they're
+obtaining tokens for the same service they store these tokens at the same 
place. It's easy to see that
+if they're used together with the same credentials then there will be no 
issues since the tokens are
+going to be overwritten by each other in a single-threaded way (which belongs 
to a single user).
+However, if the plugins are configured with different user credentials then 
the token which will be
+used for data processing can belong to any of the users which is 
non-deterministic.
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 0dac58be3bf..285d0b96098 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver;
 import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.runtime.util.HadoopConfigLoader;
 import org.apache.flink.util.Preconditions;
@@ -124,7 +124,7 @@ public abstract class AbstractS3FileSystemFactory 
implements FileSystemFactory {
             // create the Hadoop FileSystem
             org.apache.hadoop.conf.Configuration hadoopConfig =
                     hadoopConfigLoader.getOrLoadHadoopConfig();
-            S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
+            AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
             org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
             fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
 
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProvider.java
similarity index 94%
rename from 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
rename to 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProvider.java
index 4c04b975908..82e99dcde89 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProvider.java
@@ -38,19 +38,15 @@ import java.util.Optional;
 
 /** Delegation token provider for S3 filesystems. */
 @Internal
-public class S3DelegationTokenProvider implements DelegationTokenProvider {
+public abstract class AbstractS3DelegationTokenProvider implements 
DelegationTokenProvider {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(S3DelegationTokenProvider.class);
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractS3DelegationTokenProvider.class);
 
     private String region;
     private String accessKey;
     private String secretKey;
 
-    @Override
-    public String serviceName() {
-        return "s3";
-    }
-
     @Override
     public void init(Configuration configuration) {
         region = configuration.getString(String.format("%s.region", 
serviceConfigPrefix()), null);
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiver.java
similarity index 91%
rename from 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
rename to 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiver.java
index 55dd18d3c4b..bad552f432b 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiver.java
@@ -34,11 +34,12 @@ import javax.annotation.Nullable;
 
 /** Delegation token receiver for S3 filesystems. */
 @Internal
-public class S3DelegationTokenReceiver implements DelegationTokenReceiver {
+public abstract class AbstractS3DelegationTokenReceiver implements 
DelegationTokenReceiver {
 
     public static final String PROVIDER_CONFIG_NAME = 
"fs.s3a.aws.credentials.provider";
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AbstractS3DelegationTokenReceiver.class);
 
     @VisibleForTesting @Nullable static volatile Credentials credentials;
 
@@ -69,11 +70,6 @@ public class S3DelegationTokenReceiver implements 
DelegationTokenReceiver {
         LOG.info("Updated Hadoop configuration successfully");
     }
 
-    @Override
-    public String serviceName() {
-        return "s3";
-    }
-
     @Override
     public void init(Configuration configuration) {
         region =
@@ -92,7 +88,7 @@ public class S3DelegationTokenReceiver implements 
DelegationTokenReceiver {
         LOG.info("Updating session credentials");
         credentials =
                 InstantiationUtil.deserializeObject(
-                        tokens, 
S3DelegationTokenReceiver.class.getClassLoader());
+                        tokens, 
AbstractS3DelegationTokenReceiver.class.getClassLoader());
         LOG.info(
                 "Session credentials updated successfully with access key: {} 
expiration: {}",
                 credentials.getAccessKeyId(),
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
index 8760f1aa3ee..8b04ce1a118 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
@@ -47,7 +47,7 @@ public class DynamicTemporaryAWSCredentialsProvider 
implements AWSCredentialsPro
 
     @Override
     public AWSCredentials getCredentials() throws SdkBaseException {
-        Credentials credentials = S3DelegationTokenReceiver.getCredentials();
+        Credentials credentials = 
AbstractS3DelegationTokenReceiver.getCredentials();
         if (credentials == null) {
             throw new NoAwsCredentialsException(COMPONENT);
         }
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProviderTest.java
similarity index 78%
rename from 
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
rename to 
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProviderTest.java
index d0109cbc44d..46f1f32765f 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProviderTest.java
@@ -20,30 +20,41 @@ package org.apache.flink.fs.s3.common.token;
 
 import org.apache.flink.configuration.Configuration;
 
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import static 
org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-/** Tests for {@link S3DelegationTokenProvider}. */
-public class S3DelegationTokenProviderTest {
+/** Tests for {@link AbstractS3DelegationTokenProvider}. */
+public class AbstractS3DelegationTokenProviderTest {
 
     private static final String REGION = "testRegion";
     private static final String ACCESS_KEY_ID = "testAccessKeyId";
     private static final String SECRET_ACCESS_KEY = "testSecretAccessKey";
 
+    private AbstractS3DelegationTokenProvider provider;
+
+    @BeforeEach
+    public void beforeEach() {
+        provider =
+                new AbstractS3DelegationTokenProvider() {
+                    @Override
+                    public String serviceName() {
+                        return "s3";
+                    }
+                };
+    }
+
     @Test
     public void delegationTokensRequiredShouldReturnFalseWithoutCredentials() {
-        S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
         provider.init(new Configuration());
-
         assertFalse(provider.delegationTokensRequired());
     }
 
     @Test
     public void delegationTokensRequiredShouldReturnTrueWithCredentials() {
-        S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
         Configuration configuration = new Configuration();
         configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
         configuration.setString(CONFIG_PREFIX + ".s3.access-key", 
ACCESS_KEY_ID);
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiverTest.java
similarity index 75%
rename from 
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
rename to 
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiverTest.java
index ab817b0a532..0ba43df32f9 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiverTest.java
@@ -20,29 +20,29 @@ package org.apache.flink.fs.s3.common.token;
 
 import org.apache.flink.configuration.Configuration;
 
-import org.junit.Test;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import static 
org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
-import static 
org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver.PROVIDER_CONFIG_NAME;
+import static 
org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver.PROVIDER_CONFIG_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-/** Tests for {@link S3DelegationTokenReceiver}. */
-public class S3DelegationTokenReceiverTest {
+/** Tests for {@link AbstractS3DelegationTokenReceiver}. */
+public class AbstractS3DelegationTokenReceiverTest {
 
     private static final String PROVIDER_CLASS_NAME = "TestProvider";
     private static final String REGION = "testRegion";
 
     @BeforeEach
     void beforeEach() {
-        S3DelegationTokenReceiver.region = null;
+        AbstractS3DelegationTokenReceiver.region = null;
     }
 
     @AfterEach
     void afterEach() {
-        S3DelegationTokenReceiver.region = null;
+        AbstractS3DelegationTokenReceiver.region = null;
     }
 
     @Test
@@ -50,7 +50,7 @@ public class S3DelegationTokenReceiverTest {
         org.apache.hadoop.conf.Configuration hadoopConfiguration =
                 new org.apache.hadoop.conf.Configuration();
         hadoopConfiguration.set(PROVIDER_CONFIG_NAME, "");
-        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
         assertEquals(
                 DynamicTemporaryAWSCredentialsProvider.NAME,
                 hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
@@ -61,7 +61,7 @@ public class S3DelegationTokenReceiverTest {
         org.apache.hadoop.conf.Configuration hadoopConfiguration =
                 new org.apache.hadoop.conf.Configuration();
         hadoopConfiguration.set(PROVIDER_CONFIG_NAME, PROVIDER_CLASS_NAME);
-        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
         String[] providers = 
hadoopConfiguration.get(PROVIDER_CONFIG_NAME).split(",");
         assertEquals(2, providers.length);
         assertEquals(DynamicTemporaryAWSCredentialsProvider.NAME, 
providers[0]);
@@ -73,7 +73,7 @@ public class S3DelegationTokenReceiverTest {
         org.apache.hadoop.conf.Configuration hadoopConfiguration =
                 new org.apache.hadoop.conf.Configuration();
         hadoopConfiguration.set(PROVIDER_CONFIG_NAME, 
DynamicTemporaryAWSCredentialsProvider.NAME);
-        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
         assertEquals(
                 DynamicTemporaryAWSCredentialsProvider.NAME,
                 hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
@@ -81,25 +81,34 @@ public class S3DelegationTokenReceiverTest {
 
     @Test
     public void updateHadoopConfigShouldNotUpdateRegionWhenNotConfigured() {
-        S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+        AbstractS3DelegationTokenReceiver receiver = createReceiver();
         receiver.init(new Configuration());
 
         org.apache.hadoop.conf.Configuration hadoopConfiguration =
                 new org.apache.hadoop.conf.Configuration();
-        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
         assertNull(hadoopConfiguration.get("fs.s3a.endpoint.region"));
     }
 
     @Test
     public void updateHadoopConfigShouldUpdateRegionWhenConfigured() {
-        S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+        AbstractS3DelegationTokenReceiver receiver = createReceiver();
         Configuration configuration = new Configuration();
         configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
         receiver.init(configuration);
 
         org.apache.hadoop.conf.Configuration hadoopConfiguration =
                 new org.apache.hadoop.conf.Configuration();
-        S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+        
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
         assertEquals(REGION, 
hadoopConfiguration.get("fs.s3a.endpoint.region"));
     }
+
+    private AbstractS3DelegationTokenReceiver createReceiver() {
+        return new AbstractS3DelegationTokenReceiver() {
+            @Override
+            public String serviceName() {
+                return "s3";
+            }
+        };
+    }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
index a6b926f8737..8115b61a711 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
@@ -39,12 +39,12 @@ public class DynamicTemporaryAWSCredentialsProviderTest {
 
     @BeforeEach
     void beforeEach() {
-        S3DelegationTokenReceiver.credentials = null;
+        AbstractS3DelegationTokenReceiver.credentials = null;
     }
 
     @AfterEach
     void afterEach() {
-        S3DelegationTokenReceiver.credentials = null;
+        AbstractS3DelegationTokenReceiver.credentials = null;
     }
 
     @Test
@@ -61,7 +61,13 @@ public class DynamicTemporaryAWSCredentialsProviderTest {
                 new DynamicTemporaryAWSCredentialsProvider();
         Credentials credentials =
                 new Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY, 
SESSION_TOKEN, null);
-        S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+        AbstractS3DelegationTokenReceiver receiver =
+                new AbstractS3DelegationTokenReceiver() {
+                    @Override
+                    public String serviceName() {
+                        return "s3";
+                    }
+                };
 
         
receiver.onNewTokensObtained(InstantiationUtil.serializeObject(credentials));
         BasicSessionCredentials returnedCredentials =
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenProvider.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenProvider.java
new file mode 100644
index 00000000000..a6938546077
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenProvider;
+
+/** Delegation token provider for S3 Hadoop filesystems. */
+@Internal
+public class S3HadoopDelegationTokenProvider extends 
AbstractS3DelegationTokenProvider {
+    @Override
+    public String serviceName() {
+        return "s3-hadoop";
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenReceiver.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenReceiver.java
new file mode 100644
index 00000000000..7cc2da0cbee
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenReceiver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver;
+
+/** Delegation token receiver for S3 Hadoop filesystems. */
+@Internal
+public class S3HadoopDelegationTokenReceiver extends 
AbstractS3DelegationTokenReceiver {
+    @Override
+    public String serviceName() {
+        return "s3-hadoop";
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 92%
copy from 
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
copy to 
flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
index d2e298d1a28..cdccc63f2c0 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.fs.s3.common.token.S3DelegationTokenProvider
+org.apache.flink.fs.s3hadoop.token.S3HadoopDelegationTokenProvider
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 92%
copy from 
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
copy to 
flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
index 59ba43f7f2a..6debc4eb234 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver
+org.apache.flink.fs.s3hadoop.token.S3HadoopDelegationTokenReceiver
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenProvider.java
 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenProvider.java
new file mode 100644
index 00000000000..81fc5e0e301
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenProvider;
+
+/** Delegation token provider for S3 Presto filesystems. */
+@Internal
+public class S3PrestoDelegationTokenProvider extends 
AbstractS3DelegationTokenProvider {
+    @Override
+    public String serviceName() {
+        return "s3-presto";
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenReceiver.java
 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenReceiver.java
new file mode 100644
index 00000000000..affb747dcf2
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenReceiver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver;
+
+/** Delegation token receiver for S3 Presto filesystems. */
+@Internal
+public class S3PrestoDelegationTokenReceiver extends 
AbstractS3DelegationTokenReceiver {
+    @Override
+    public String serviceName() {
+        return "s3-presto";
+    }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
 
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 92%
rename from 
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
rename to 
flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
index d2e298d1a28..d46a327f518 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.fs.s3.common.token.S3DelegationTokenProvider
+org.apache.flink.fs.s3presto.token.S3PrestoDelegationTokenProvider
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
 
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 92%
rename from 
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
rename to 
flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
index 59ba43f7f2a..c0d50f8084e 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver
+org.apache.flink.fs.s3presto.token.S3PrestoDelegationTokenReceiver
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index c144037d223..fd5929cbdfe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -116,6 +116,11 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
         checkProviderAndReceiverConsistency(
                 delegationTokenProviders,
                 delegationTokenReceiverRepository.delegationTokenReceivers);
+        Set<String> warnings = new HashSet<>();
+        checkSamePrefixedProviders(delegationTokenProviders, warnings);
+        for (String warning : warnings) {
+            LOG.warn(warning);
+        }
     }
 
     private Map<String, DelegationTokenProvider> loadProviders() {
@@ -132,8 +137,9 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
                                     provider.serviceName());
                             checkState(
                                     
!providers.containsKey(provider.serviceName()),
-                                    "Delegation token provider with service 
name {} has multiple implementations",
-                                    provider.serviceName());
+                                    "Delegation token provider with service 
name "
+                                            + provider.serviceName()
+                                            + " has multiple implementations");
                             providers.put(provider.serviceName(), provider);
                         } else {
                             LOG.info(
@@ -203,6 +209,22 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
         LOG.info("Provider and receiver instances are consistent");
     }
 
+    @VisibleForTesting
+    static void checkSamePrefixedProviders(
+            Map<String, DelegationTokenProvider> providers, Set<String> 
warnings) {
+        Set<String> providerPrefixes = new HashSet<>();
+        for (String name : providers.keySet()) {
+            String[] split = name.split("-");
+            if (!providerPrefixes.add(split[0])) {
+                String msg =
+                        String.format(
+                                "Multiple providers loaded with the same 
prefix: %s. This might lead to unintended consequences, please consider using 
only one of them.",
+                                split[0]);
+                warnings.add(msg);
+            }
+        }
+    }
+
     /**
      * Obtains new tokens in a one-time fashion and leaves it up to the caller 
to distribute them.
      */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index b1b19109cbe..0fbf1c5bdb6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -32,7 +32,9 @@ import java.time.Clock;
 import java.time.ZoneId;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.time.Instant.ofEpochMilli;
@@ -162,6 +164,28 @@ public class DefaultDelegationTokenManagerTest {
         assertTrue(receivers.containsKey("test"));
     }
 
+    @Test
+    public void 
checkSamePrefixedProvidersShouldNotGiveErrorsWhenNoSamePrefix() {
+        Map<String, DelegationTokenProvider> providers = new HashMap<>();
+        providers.put("s3-hadoop", new TestDelegationTokenProvider());
+        Set<String> warnings = new HashSet<>();
+        DefaultDelegationTokenManager.checkSamePrefixedProviders(providers, 
warnings);
+        assertTrue(warnings.isEmpty());
+    }
+
+    @Test
+    public void checkSamePrefixedProvidersShouldGiveErrorsWhenSamePrefix() {
+        Map<String, DelegationTokenProvider> providers = new HashMap<>();
+        providers.put("s3-hadoop", new TestDelegationTokenProvider());
+        providers.put("s3-presto", new TestDelegationTokenProvider());
+        Set<String> warnings = new HashSet<>();
+        DefaultDelegationTokenManager.checkSamePrefixedProviders(providers, 
warnings);
+        assertEquals(1, warnings.size());
+        assertEquals(
+                "Multiple providers loaded with the same prefix: s3. This 
might lead to unintended consequences, please consider using only one of them.",
+                warnings.iterator().next());
+    }
+
     @Test
     public void startTokensUpdateShouldScheduleRenewal() {
         final ManuallyTriggeredScheduledExecutor scheduledExecutor =


Reply via email to