This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push:
new af4c68ab8c9 [FLINK-31839][filesystems] Fix flink-s3-fs-hadoop and
flink-s3-fs-presto plugin collision
af4c68ab8c9 is described below
commit af4c68ab8c910ffc4e02ec3c0b07fe727d867303
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Apr 19 18:49:15 2023 +0200
[FLINK-31839][filesystems] Fix flink-s3-fs-hadoop and flink-s3-fs-presto
plugin collision
These collided with regards to their name of delegation token service.
---
.../security/security-delegation-token.md | 21 ++++++++-----
.../fs/s3/common/AbstractS3FileSystemFactory.java | 4 +--
...java => AbstractS3DelegationTokenProvider.java} | 10 ++-----
...java => AbstractS3DelegationTokenReceiver.java} | 12 +++-----
.../DynamicTemporaryAWSCredentialsProvider.java | 2 +-
... => AbstractS3DelegationTokenProviderTest.java} | 23 ++++++++++----
... => AbstractS3DelegationTokenReceiverTest.java} | 35 ++++++++++++++--------
...DynamicTemporaryAWSCredentialsProviderTest.java | 12 ++++++--
.../token/S3HadoopDelegationTokenProvider.java | 31 +++++++++++++++++++
.../token/S3HadoopDelegationTokenReceiver.java | 31 +++++++++++++++++++
...ink.core.security.token.DelegationTokenProvider | 2 +-
...ink.core.security.token.DelegationTokenReceiver | 2 +-
.../token/S3PrestoDelegationTokenProvider.java | 31 +++++++++++++++++++
.../token/S3PrestoDelegationTokenReceiver.java | 31 +++++++++++++++++++
...ink.core.security.token.DelegationTokenProvider | 2 +-
...ink.core.security.token.DelegationTokenReceiver | 2 +-
.../token/DefaultDelegationTokenManager.java | 26 ++++++++++++++--
.../token/DefaultDelegationTokenManagerTest.java | 24 +++++++++++++++
18 files changed, 248 insertions(+), 53 deletions(-)
diff --git a/docs/content/docs/deployment/security/security-delegation-token.md
b/docs/content/docs/deployment/security/security-delegation-token.md
index 16ce2228783..a7f1876d595 100644
--- a/docs/content/docs/deployment/security/security-delegation-token.md
+++ b/docs/content/docs/deployment/security/security-delegation-token.md
@@ -213,23 +213,30 @@ loaded and then will be overwritten by the loading
mechanism in Flink.
There are certain limitations to bear in mind when talking about DTs.
-Firstly, not all DTs actually expose their renewal period. This is a service
configuration that is
+* Not all DTs actually expose their renewal period. This is a service
configuration that is
not generally exposed to clients. For this reason, certain DT providers cannot
provide a renewal period,
thus requiring that the service's configuration is in some way synchronized
with another service
-that does provide that information.
-
+that does provide that information.
The HDFS service, which is generally available when DTs are needed in the
first place, provides
this information, so in general it's a good idea for all services using DTs to
use the same
configuration as HDFS for the renewal period.
-Secondly, Flink is not parsing the user application code, so it doesn't know
which delegation
+* Flink is not parsing the user application code, so it doesn't know which
delegation
tokens will be needed. This means that Flink will try to get as many
delegation tokens as is possible
based on the configuration available. That means that if an HBase token
provider is enabled but the app
doesn't actually use HBase, a DT will still be generated. The user would have
to explicitly
disable the mentioned provider in that case.
-Thirdly, it is challenging to create DTs "on demand". Flink
obtains/distributes tokens upfront
-and re-obtains/re-distributes them periodically.
-
+* It is challenging to create DTs "on demand". Flink obtains/distributes
tokens upfront
+and re-obtains/re-distributes them periodically.
The advantage, though, is that user code does not need to worry about DTs,
since Flink will handle
them transparently when the proper configuration is available.
+
+* There are external file system plugins which are authenticating to the same
service. One good example
+is `s3-hadoop` and `s3-presto`. They both authenticate to S3. They're having
different service names
+but obtaining tokens for the same service which might cause unintended
consequences. Since they're
+obtaining tokens for the same service they store these tokens at the same
place. It's easy to see that
+if they're used together with the same credentials then there will be no
issues since the tokens are
+going to be overwritten by each other in a single-threaded way (which belongs
to a single user).
+However, if the plugins are configured with different user credentials then
the token which will be
+used for data processing can belong to any of the users which is
non-deterministic.
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 0dac58be3bf..285d0b96098 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.Preconditions;
@@ -124,7 +124,7 @@ public abstract class AbstractS3FileSystemFactory
implements FileSystemFactory {
// create the Hadoop FileSystem
org.apache.hadoop.conf.Configuration hadoopConfig =
hadoopConfigLoader.getOrLoadHadoopConfig();
- S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
+ AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfig);
org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProvider.java
similarity index 94%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
rename to
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProvider.java
index 4c04b975908..82e99dcde89 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProvider.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProvider.java
@@ -38,19 +38,15 @@ import java.util.Optional;
/** Delegation token provider for S3 filesystems. */
@Internal
-public class S3DelegationTokenProvider implements DelegationTokenProvider {
+public abstract class AbstractS3DelegationTokenProvider implements
DelegationTokenProvider {
- private static final Logger LOG =
LoggerFactory.getLogger(S3DelegationTokenProvider.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractS3DelegationTokenProvider.class);
private String region;
private String accessKey;
private String secretKey;
- @Override
- public String serviceName() {
- return "s3";
- }
-
@Override
public void init(Configuration configuration) {
region = configuration.getString(String.format("%s.region",
serviceConfigPrefix()), null);
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiver.java
similarity index 91%
rename from
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
rename to
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiver.java
index 55dd18d3c4b..bad552f432b 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiver.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiver.java
@@ -34,11 +34,12 @@ import javax.annotation.Nullable;
/** Delegation token receiver for S3 filesystems. */
@Internal
-public class S3DelegationTokenReceiver implements DelegationTokenReceiver {
+public abstract class AbstractS3DelegationTokenReceiver implements
DelegationTokenReceiver {
public static final String PROVIDER_CONFIG_NAME =
"fs.s3a.aws.credentials.provider";
- private static final Logger LOG =
LoggerFactory.getLogger(S3DelegationTokenReceiver.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractS3DelegationTokenReceiver.class);
@VisibleForTesting @Nullable static volatile Credentials credentials;
@@ -69,11 +70,6 @@ public class S3DelegationTokenReceiver implements
DelegationTokenReceiver {
LOG.info("Updated Hadoop configuration successfully");
}
- @Override
- public String serviceName() {
- return "s3";
- }
-
@Override
public void init(Configuration configuration) {
region =
@@ -92,7 +88,7 @@ public class S3DelegationTokenReceiver implements
DelegationTokenReceiver {
LOG.info("Updating session credentials");
credentials =
InstantiationUtil.deserializeObject(
- tokens,
S3DelegationTokenReceiver.class.getClassLoader());
+ tokens,
AbstractS3DelegationTokenReceiver.class.getClassLoader());
LOG.info(
"Session credentials updated successfully with access key: {}
expiration: {}",
credentials.getAccessKeyId(),
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
index 8760f1aa3ee..8b04ce1a118 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
+++
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
@@ -47,7 +47,7 @@ public class DynamicTemporaryAWSCredentialsProvider
implements AWSCredentialsPro
@Override
public AWSCredentials getCredentials() throws SdkBaseException {
- Credentials credentials = S3DelegationTokenReceiver.getCredentials();
+ Credentials credentials =
AbstractS3DelegationTokenReceiver.getCredentials();
if (credentials == null) {
throw new NoAwsCredentialsException(COMPONENT);
}
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProviderTest.java
similarity index 78%
rename from
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
rename to
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProviderTest.java
index d0109cbc44d..46f1f32765f 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenProviderTest.java
+++
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenProviderTest.java
@@ -20,30 +20,41 @@ package org.apache.flink.fs.s3.common.token;
import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import static
org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/** Tests for {@link S3DelegationTokenProvider}. */
-public class S3DelegationTokenProviderTest {
+/** Tests for {@link AbstractS3DelegationTokenProvider}. */
+public class AbstractS3DelegationTokenProviderTest {
private static final String REGION = "testRegion";
private static final String ACCESS_KEY_ID = "testAccessKeyId";
private static final String SECRET_ACCESS_KEY = "testSecretAccessKey";
+ private AbstractS3DelegationTokenProvider provider;
+
+ @BeforeEach
+ public void beforeEach() {
+ provider =
+ new AbstractS3DelegationTokenProvider() {
+ @Override
+ public String serviceName() {
+ return "s3";
+ }
+ };
+ }
+
@Test
public void delegationTokensRequiredShouldReturnFalseWithoutCredentials() {
- S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
provider.init(new Configuration());
-
assertFalse(provider.delegationTokensRequired());
}
@Test
public void delegationTokensRequiredShouldReturnTrueWithCredentials() {
- S3DelegationTokenProvider provider = new S3DelegationTokenProvider();
Configuration configuration = new Configuration();
configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
configuration.setString(CONFIG_PREFIX + ".s3.access-key",
ACCESS_KEY_ID);
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiverTest.java
similarity index 75%
rename from
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
rename to
flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiverTest.java
index ab817b0a532..0ba43df32f9 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/S3DelegationTokenReceiverTest.java
+++
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/AbstractS3DelegationTokenReceiverTest.java
@@ -20,29 +20,29 @@ package org.apache.flink.fs.s3.common.token;
import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import static
org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX;
-import static
org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver.PROVIDER_CONFIG_NAME;
+import static
org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver.PROVIDER_CONFIG_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-/** Tests for {@link S3DelegationTokenReceiver}. */
-public class S3DelegationTokenReceiverTest {
+/** Tests for {@link AbstractS3DelegationTokenReceiver}. */
+public class AbstractS3DelegationTokenReceiverTest {
private static final String PROVIDER_CLASS_NAME = "TestProvider";
private static final String REGION = "testRegion";
@BeforeEach
void beforeEach() {
- S3DelegationTokenReceiver.region = null;
+ AbstractS3DelegationTokenReceiver.region = null;
}
@AfterEach
void afterEach() {
- S3DelegationTokenReceiver.region = null;
+ AbstractS3DelegationTokenReceiver.region = null;
}
@Test
@@ -50,7 +50,7 @@ public class S3DelegationTokenReceiverTest {
org.apache.hadoop.conf.Configuration hadoopConfiguration =
new org.apache.hadoop.conf.Configuration();
hadoopConfiguration.set(PROVIDER_CONFIG_NAME, "");
- S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertEquals(
DynamicTemporaryAWSCredentialsProvider.NAME,
hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
@@ -61,7 +61,7 @@ public class S3DelegationTokenReceiverTest {
org.apache.hadoop.conf.Configuration hadoopConfiguration =
new org.apache.hadoop.conf.Configuration();
hadoopConfiguration.set(PROVIDER_CONFIG_NAME, PROVIDER_CLASS_NAME);
- S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
String[] providers =
hadoopConfiguration.get(PROVIDER_CONFIG_NAME).split(",");
assertEquals(2, providers.length);
assertEquals(DynamicTemporaryAWSCredentialsProvider.NAME,
providers[0]);
@@ -73,7 +73,7 @@ public class S3DelegationTokenReceiverTest {
org.apache.hadoop.conf.Configuration hadoopConfiguration =
new org.apache.hadoop.conf.Configuration();
hadoopConfiguration.set(PROVIDER_CONFIG_NAME,
DynamicTemporaryAWSCredentialsProvider.NAME);
- S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertEquals(
DynamicTemporaryAWSCredentialsProvider.NAME,
hadoopConfiguration.get(PROVIDER_CONFIG_NAME));
@@ -81,25 +81,34 @@ public class S3DelegationTokenReceiverTest {
@Test
public void updateHadoopConfigShouldNotUpdateRegionWhenNotConfigured() {
- S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+ AbstractS3DelegationTokenReceiver receiver = createReceiver();
receiver.init(new Configuration());
org.apache.hadoop.conf.Configuration hadoopConfiguration =
new org.apache.hadoop.conf.Configuration();
- S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertNull(hadoopConfiguration.get("fs.s3a.endpoint.region"));
}
@Test
public void updateHadoopConfigShouldUpdateRegionWhenConfigured() {
- S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+ AbstractS3DelegationTokenReceiver receiver = createReceiver();
Configuration configuration = new Configuration();
configuration.setString(CONFIG_PREFIX + ".s3.region", REGION);
receiver.init(configuration);
org.apache.hadoop.conf.Configuration hadoopConfiguration =
new org.apache.hadoop.conf.Configuration();
- S3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
+
AbstractS3DelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertEquals(REGION,
hadoopConfiguration.get("fs.s3a.endpoint.region"));
}
+
+ private AbstractS3DelegationTokenReceiver createReceiver() {
+ return new AbstractS3DelegationTokenReceiver() {
+ @Override
+ public String serviceName() {
+ return "s3";
+ }
+ };
+ }
}
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
index a6b926f8737..8115b61a711 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
+++
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProviderTest.java
@@ -39,12 +39,12 @@ public class DynamicTemporaryAWSCredentialsProviderTest {
@BeforeEach
void beforeEach() {
- S3DelegationTokenReceiver.credentials = null;
+ AbstractS3DelegationTokenReceiver.credentials = null;
}
@AfterEach
void afterEach() {
- S3DelegationTokenReceiver.credentials = null;
+ AbstractS3DelegationTokenReceiver.credentials = null;
}
@Test
@@ -61,7 +61,13 @@ public class DynamicTemporaryAWSCredentialsProviderTest {
new DynamicTemporaryAWSCredentialsProvider();
Credentials credentials =
new Credentials(ACCESS_KEY_ID, SECRET_ACCESS_KEY,
SESSION_TOKEN, null);
- S3DelegationTokenReceiver receiver = new S3DelegationTokenReceiver();
+ AbstractS3DelegationTokenReceiver receiver =
+ new AbstractS3DelegationTokenReceiver() {
+ @Override
+ public String serviceName() {
+ return "s3";
+ }
+ };
receiver.onNewTokensObtained(InstantiationUtil.serializeObject(credentials));
BasicSessionCredentials returnedCredentials =
diff --git
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenProvider.java
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenProvider.java
new file mode 100644
index 00000000000..a6938546077
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenProvider;
+
+/** Delegation token provider for S3 Hadoop filesystems. */
+@Internal
+public class S3HadoopDelegationTokenProvider extends
AbstractS3DelegationTokenProvider {
+ @Override
+ public String serviceName() {
+ return "s3-hadoop";
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenReceiver.java
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenReceiver.java
new file mode 100644
index 00000000000..7cc2da0cbee
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/token/S3HadoopDelegationTokenReceiver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver;
+
+/** Delegation token receiver for S3 Hadoop filesystems. */
+@Internal
+public class S3HadoopDelegationTokenReceiver extends
AbstractS3DelegationTokenReceiver {
+ @Override
+ public String serviceName() {
+ return "s3-hadoop";
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 92%
copy from
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
copy to
flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
index d2e298d1a28..cdccc63f2c0 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.fs.s3.common.token.S3DelegationTokenProvider
+org.apache.flink.fs.s3hadoop.token.S3HadoopDelegationTokenProvider
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 92%
copy from
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
copy to
flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
index 59ba43f7f2a..6debc4eb234 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver
+org.apache.flink.fs.s3hadoop.token.S3HadoopDelegationTokenReceiver
diff --git
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenProvider.java
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenProvider.java
new file mode 100644
index 00000000000..81fc5e0e301
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenProvider;
+
+/** Delegation token provider for S3 Presto filesystems. */
+@Internal
+public class S3PrestoDelegationTokenProvider extends
AbstractS3DelegationTokenProvider {
+ @Override
+ public String serviceName() {
+ return "s3-presto";
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenReceiver.java
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenReceiver.java
new file mode 100644
index 00000000000..affb747dcf2
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/token/S3PrestoDelegationTokenReceiver.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto.token;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.fs.s3.common.token.AbstractS3DelegationTokenReceiver;
+
+/** Delegation token receiver for S3 Presto filesystems. */
+@Internal
+public class S3PrestoDelegationTokenReceiver extends
AbstractS3DelegationTokenReceiver {
+ @Override
+ public String serviceName() {
+ return "s3-presto";
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
similarity index 92%
rename from
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
rename to
flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
index d2e298d1a28..d46a327f518 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
+++
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenProvider
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.fs.s3.common.token.S3DelegationTokenProvider
+org.apache.flink.fs.s3presto.token.S3PrestoDelegationTokenProvider
diff --git
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
similarity index 92%
rename from
flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
rename to
flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
index 59ba43f7f2a..c0d50f8084e 100644
---
a/flink-filesystems/flink-s3-fs-base/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
+++
b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/services/org.apache.flink.core.security.token.DelegationTokenReceiver
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.fs.s3.common.token.S3DelegationTokenReceiver
+org.apache.flink.fs.s3presto.token.S3PrestoDelegationTokenReceiver
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index c144037d223..fd5929cbdfe 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -116,6 +116,11 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
checkProviderAndReceiverConsistency(
delegationTokenProviders,
delegationTokenReceiverRepository.delegationTokenReceivers);
+ Set<String> warnings = new HashSet<>();
+ checkSamePrefixedProviders(delegationTokenProviders, warnings);
+ for (String warning : warnings) {
+ LOG.warn(warning);
+ }
}
private Map<String, DelegationTokenProvider> loadProviders() {
@@ -132,8 +137,9 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
provider.serviceName());
checkState(
!providers.containsKey(provider.serviceName()),
- "Delegation token provider with service
name {} has multiple implementations",
- provider.serviceName());
+ "Delegation token provider with service
name "
+ + provider.serviceName()
+ + " has multiple implementations");
providers.put(provider.serviceName(), provider);
} else {
LOG.info(
@@ -203,6 +209,22 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
LOG.info("Provider and receiver instances are consistent");
}
+ @VisibleForTesting
+ static void checkSamePrefixedProviders(
+ Map<String, DelegationTokenProvider> providers, Set<String>
warnings) {
+ Set<String> providerPrefixes = new HashSet<>();
+ for (String name : providers.keySet()) {
+ String[] split = name.split("-");
+ if (!providerPrefixes.add(split[0])) {
+ String msg =
+ String.format(
+ "Multiple providers loaded with the same
prefix: %s. This might lead to unintended consequences, please consider using
only one of them.",
+ split[0]);
+ warnings.add(msg);
+ }
+ }
+ }
+
/**
* Obtains new tokens in a one-time fashion and leaves it up to the caller
to distribute them.
*/
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
index b1b19109cbe..0fbf1c5bdb6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java
@@ -32,7 +32,9 @@ import java.time.Clock;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static java.time.Instant.ofEpochMilli;
@@ -162,6 +164,28 @@ public class DefaultDelegationTokenManagerTest {
assertTrue(receivers.containsKey("test"));
}
+ @Test
+ public void
checkSamePrefixedProvidersShouldNotGiveErrorsWhenNoSamePrefix() {
+ Map<String, DelegationTokenProvider> providers = new HashMap<>();
+ providers.put("s3-hadoop", new TestDelegationTokenProvider());
+ Set<String> warnings = new HashSet<>();
+ DefaultDelegationTokenManager.checkSamePrefixedProviders(providers,
warnings);
+ assertTrue(warnings.isEmpty());
+ }
+
+ @Test
+ public void checkSamePrefixedProvidersShouldGiveErrorsWhenSamePrefix() {
+ Map<String, DelegationTokenProvider> providers = new HashMap<>();
+ providers.put("s3-hadoop", new TestDelegationTokenProvider());
+ providers.put("s3-presto", new TestDelegationTokenProvider());
+ Set<String> warnings = new HashSet<>();
+ DefaultDelegationTokenManager.checkSamePrefixedProviders(providers,
warnings);
+ assertEquals(1, warnings.size());
+ assertEquals(
+ "Multiple providers loaded with the same prefix: s3. This
might lead to unintended consequences, please consider using only one of them.",
+ warnings.iterator().next());
+ }
+
@Test
public void startTokensUpdateShouldScheduleRenewal() {
final ManuallyTriggeredScheduledExecutor scheduledExecutor =