This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4ba336656b6 [FLINK-30402][security][runtime] Separate token framework
generic and hadoop specific parts
4ba336656b6 is described below
commit 4ba336656b6a24c99b7e4e50ef9772fd58d79792
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Dec 14 16:28:48 2022 +0100
[FLINK-30402][security][runtime] Separate token framework generic and
hadoop specific parts
---
.../runtime/entrypoint/ClusterEntrypoint.java | 2 +-
.../flink/runtime/minicluster/MiniCluster.java | 2 +-
.../runtime/security/modules/HadoopModule.java | 2 +-
.../{ => hadoop}/HBaseDelegationTokenProvider.java | 2 +-
.../HadoopDelegationTokenConverter.java} | 4 +--
.../HadoopDelegationTokenProvider.java | 3 ++-
.../HadoopDelegationTokenUpdater.java} | 10 +++----
.../HadoopFSDelegationTokenProvider.java | 2 +-
.../KerberosDelegationTokenManager.java | 11 +++++---
.../KerberosDelegationTokenManagerFactory.java | 4 ++-
.../token/{ => hadoop}/KerberosLoginProvider.java | 2 +-
.../flink/runtime/taskexecutor/TaskExecutor.java | 7 ++---
...ity.token.hadoop.HadoopDelegationTokenProvider} | 4 +--
...ptionThrowingHadoopDelegationTokenProvider.java | 2 +-
.../HadoopDelegationTokenConverterTest.java} | 10 +++----
.../HadoopDelegationTokenUpdaterITCase.java} | 12 ++++-----
.../HadoopFSDelegationTokenProviderITCase.java | 31 +++++++++++++---------
.../KerberosDelegationTokenManagerITCase.java | 3 ++-
.../{ => hadoop}/KerberosLoginProviderITCase.java | 2 +-
.../TestHadoopDelegationTokenIdentifier.java} | 8 +++---
.../TestHadoopDelegationTokenProvider.java | 7 +++--
...ity.token.hadoop.HadoopDelegationTokenProvider} | 4 +--
...rg.apache.hadoop.security.token.TokenIdentifier | 2 +-
.../apache/flink/yarn/YarnClusterDescriptor.java | 8 +++---
24 files changed, 80 insertions(+), 64 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ce67ccd3010..243dbe059a6 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -66,7 +66,7 @@ import
org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
+import
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import
org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bab16dd21f8..a4c2fd06968 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -90,7 +90,7 @@ import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
+import
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 6e77341e755..2ebcdb6f1bf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.security.modules;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.hadoop.HadoopUserUtils;
import org.apache.flink.runtime.security.SecurityConfiguration;
-import org.apache.flink.runtime.security.token.KerberosLoginProvider;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
similarity index 99%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index 64b2fba8e8e..4441267f355 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenConverter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverter.java
similarity index 94%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenConverter.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverter.java
index ad1927b43f3..a3829ce8cd3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenConverter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
@@ -29,7 +29,7 @@ import java.io.IOException;
/** Delegation token serializer and deserializer functionality. */
@Internal
-public class DelegationTokenConverter {
+public class HadoopDelegationTokenConverter {
/** Serializes delegation tokens. */
public static byte[] serialize(Credentials credentials) throws IOException
{
try (DataOutputBuffer dob = new DataOutputBuffer()) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopDelegationTokenProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
similarity index 94%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopDelegationTokenProvider.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
index 834a9da2259..061f32c6d6d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopDelegationTokenProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
@@ -16,10 +16,11 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenUpdater.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
similarity index 85%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenUpdater.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
index 83cc174c6cf..a4528cfad54 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenUpdater.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
@@ -29,18 +29,18 @@ import java.io.IOException;
/** Delegation token updater functionality. */
@Internal
-public final class DelegationTokenUpdater {
+public final class HadoopDelegationTokenUpdater {
- private static final Logger LOG =
LoggerFactory.getLogger(DelegationTokenUpdater.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopDelegationTokenUpdater.class);
- private DelegationTokenUpdater() {}
+ private HadoopDelegationTokenUpdater() {}
/** Updates delegation tokens for the current user. */
public static void addCurrentUserCredentials(byte[] credentialsBytes)
throws IOException {
if (credentialsBytes == null || credentialsBytes.length == 0) {
throw new IllegalArgumentException("Illegal credentials tried to
be set");
}
- Credentials credentials =
DelegationTokenConverter.deserialize(credentialsBytes);
+ Credentials credentials =
HadoopDelegationTokenConverter.deserialize(credentialsBytes);
LOG.info("Updating delegation tokens for current user");
dumpAllTokens(credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
similarity index 99%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index ad895bcee25..81d25eb0f51 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
similarity index 96%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
index 6b5ebed7aaa..ff4c95ca4af 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
@@ -16,11 +16,13 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.token.DelegationTokenListener;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.ScheduledExecutor;
@@ -189,7 +191,7 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
.flatMap(nr ->
nr.map(Stream::of).orElseGet(Stream::empty))
.min(Long::compare);
- DelegationTokenUpdater.dumpAllTokens(credentials);
+ HadoopDelegationTokenUpdater.dumpAllTokens(credentials);
return nextRenewal;
}
@@ -269,11 +271,12 @@ public class KerberosDelegationTokenManager implements
DelegationTokenManager {
Optional<Long> nextRenewal =
obtainDelegationTokensAndGetNextRenewal(credentials);
if (credentials.numberOfTokens() > 0) {
- byte[] credentialsBytes =
DelegationTokenConverter.serialize(credentials);
+ byte[] credentialsBytes =
HadoopDelegationTokenConverter.serialize(credentials);
-
DelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
+
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
LOG.info("Notifying listener about new tokens");
+ checkNotNull(delegationTokenListener, "Listener must not be
null");
delegationTokenListener.onNewTokensObtained(credentialsBytes);
LOG.info("Listener notified successfully");
} else {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
similarity index 93%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
index f16ce767803..3a812bfc769 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
@@ -16,12 +16,14 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.hadoop.HadoopDependency;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
similarity index 98%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
index c2ca529a0fb..4851a4f470c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 6e2a3c46bf6..24384585475 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -94,7 +94,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
-import org.apache.flink.runtime.security.token.DelegationTokenUpdater;
+import
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -1340,7 +1340,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
}
try {
- DelegationTokenUpdater.addCurrentUserCredentials(tokens);
+ HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
return CompletableFuture.completedFuture(Acknowledge.get());
} catch (Throwable t) {
log.error("Could not update delegation tokens.", t);
@@ -2379,7 +2379,8 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
if (success.getInitialTokens() != null) {
try {
log.info("Receive initial delegation tokens from resource
manager");
-
DelegationTokenUpdater.addCurrentUserCredentials(success.getInitialTokens());
+ HadoopDelegationTokenUpdater.addCurrentUserCredentials(
+ success.getInitialTokens());
} catch (Throwable t) {
log.error("Could not update delegation tokens.", t);
ExceptionUtils.rethrowIfFatalError(t);
diff --git
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
similarity index 83%
rename from
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
rename to
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
index 0d58510a34a..84b369d4efb 100644
---
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
+++
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
@@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
-org.apache.flink.runtime.security.token.HBaseDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.HadoopFSDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.HBaseDelegationTokenProvider
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingHadoopDelegationTokenProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
similarity index 97%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingHadoopDelegationTokenProvider.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
index ded5cf6c2ae..234b0b68b13 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingHadoopDelegationTokenProvider.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.configuration.Configuration;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenConverterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
similarity index 84%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenConverterTest.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
index 013cccec269..8ab3625df6f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenConverterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
@@ -28,8 +28,8 @@ import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-/** Test for {@link DelegationTokenConverter}. */
-public class DelegationTokenConverterTest {
+/** Test for {@link HadoopDelegationTokenConverter}. */
+public class HadoopDelegationTokenConverterTest {
@Test
public void testRoundTrip() throws IOException {
@@ -39,9 +39,9 @@ public class DelegationTokenConverterTest {
credentials.addToken(
tokenService, new Token<>(new byte[4], new byte[4], tokenKind,
tokenService));
- byte[] credentialsBytes =
DelegationTokenConverter.serialize(credentials);
+ byte[] credentialsBytes =
HadoopDelegationTokenConverter.serialize(credentials);
Credentials deserializedCredentials =
- DelegationTokenConverter.deserialize(credentialsBytes);
+ HadoopDelegationTokenConverter.deserialize(credentialsBytes);
assertEquals(1, credentials.getAllTokens().size());
assertEquals(1, deserializedCredentials.getAllTokens().size());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenUpdaterITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
similarity index 89%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenUpdaterITCase.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
index c30111575ea..70d1a1cfab4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenUpdaterITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.io.Text;
@@ -36,8 +36,8 @@ import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-/** Test for {@link DelegationTokenConverter}. */
-public class DelegationTokenUpdaterITCase {
+/** Test for {@link HadoopDelegationTokenConverter}. */
+public class HadoopDelegationTokenUpdaterITCase {
@Test
public void
addCurrentUserCredentialsShouldThrowExceptionWhenNullCredentials() {
@@ -58,7 +58,7 @@ public class DelegationTokenUpdaterITCase {
assertThrows(
IllegalArgumentException.class,
() ->
-
DelegationTokenUpdater.addCurrentUserCredentials(
+
HadoopDelegationTokenUpdater.addCurrentUserCredentials(
credentialsBytes));
assertTrue(e.getMessage().contains("Illegal credentials"));
}
@@ -72,13 +72,13 @@ public class DelegationTokenUpdaterITCase {
credentials.addToken(
tokenService, new Token<>(new byte[4], new byte[4], tokenKind,
tokenService));
- byte[] credentialsBytes =
DelegationTokenConverter.serialize(credentials);
+ byte[] credentialsBytes =
HadoopDelegationTokenConverter.serialize(credentials);
try (MockedStatic<UserGroupInformation> ugi =
mockStatic(UserGroupInformation.class)) {
UserGroupInformation userGroupInformation =
mock(UserGroupInformation.class);
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
- DelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
+
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
ArgumentCaptor<Credentials> argumentCaptor =
ArgumentCaptor.forClass(Credentials.class);
verify(userGroupInformation,
times(1)).addCredentials(argumentCaptor.capture());
assertTrue(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
similarity index 87%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
index f5e2db75100..5f23695b973 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -47,17 +47,20 @@ class HadoopFSDelegationTokenProviderITCase {
final Text tokenService1 = new Text("TEST_TOKEN_SERVICE1");
final Text tokenService2 = new Text("TEST_TOKEN_SERVICE2");
- private class TestDelegationToken extends
Token<TestDelegationTokenIdentifier> {
+ private class TestDelegationToken extends
Token<TestHadoopDelegationTokenIdentifier> {
private long newExpiration;
public TestDelegationToken(
- Text tokenService, TestDelegationTokenIdentifier identifier,
long newExpiration) {
+ Text tokenService,
+ TestHadoopDelegationTokenIdentifier identifier,
+ long newExpiration) {
super(identifier.getBytes(), new byte[4], identifier.getKind(),
tokenService);
this.newExpiration = newExpiration;
}
- public TestDelegationToken(Text tokenService,
TestDelegationTokenIdentifier identifier) {
+ public TestDelegationToken(
+ Text tokenService, TestHadoopDelegationTokenIdentifier
identifier) {
this(tokenService, identifier, 0L);
}
@@ -112,14 +115,14 @@ class HadoopFSDelegationTokenProviderITCase {
String renewer,
Set<FileSystem> fileSystemsToAccess,
Credentials credentials) {
- TestDelegationTokenIdentifier tokenIdentifier1 =
- new TestDelegationTokenIdentifier(NOW);
+ TestHadoopDelegationTokenIdentifier tokenIdentifier1 =
+ new TestHadoopDelegationTokenIdentifier(NOW);
credentials.addToken(
tokenService1,
new TestDelegationToken(tokenService1,
tokenIdentifier1, NOW + 1));
- TestDelegationTokenIdentifier tokenIdentifier2 =
- new TestDelegationTokenIdentifier(NOW);
+ TestHadoopDelegationTokenIdentifier tokenIdentifier2 =
+ new TestHadoopDelegationTokenIdentifier(NOW);
credentials.addToken(
tokenService2,
new TestDelegationToken(tokenService2,
tokenIdentifier2, NOW + 2));
@@ -155,10 +158,12 @@ class HadoopFSDelegationTokenProviderITCase {
HadoopFSDelegationTokenProvider provider = new
HadoopFSDelegationTokenProvider();
Clock constantClock = Clock.fixed(ofEpochMilli(NOW),
ZoneId.systemDefault());
Credentials credentials = new Credentials();
- TestDelegationTokenIdentifier tokenIdentifier1 = new
TestDelegationTokenIdentifier(NOW);
+ TestHadoopDelegationTokenIdentifier tokenIdentifier1 =
+ new TestHadoopDelegationTokenIdentifier(NOW);
credentials.addToken(
tokenService1, new TestDelegationToken(tokenService1,
tokenIdentifier1));
- TestDelegationTokenIdentifier tokenIdentifier2 = new
TestDelegationTokenIdentifier(NOW + 1);
+ TestHadoopDelegationTokenIdentifier tokenIdentifier2 =
+ new TestHadoopDelegationTokenIdentifier(NOW + 1);
credentials.addToken(
tokenService2, new TestDelegationToken(tokenService2,
tokenIdentifier2));
@@ -173,7 +178,7 @@ class HadoopFSDelegationTokenProviderITCase {
Clock constantClock = Clock.fixed(ofEpochMilli(NOW),
ZoneId.systemDefault());
long issueDate = NOW + 1;
AbstractDelegationTokenIdentifier tokenIdentifier =
- new TestDelegationTokenIdentifier(issueDate);
+ new TestHadoopDelegationTokenIdentifier(issueDate);
assertEquals(
issueDate,
@@ -188,7 +193,7 @@ class HadoopFSDelegationTokenProviderITCase {
Clock constantClock = Clock.fixed(ofEpochMilli(NOW),
ZoneId.systemDefault());
long issueDate = NOW - 1;
AbstractDelegationTokenIdentifier tokenIdentifier =
- new TestDelegationTokenIdentifier(issueDate);
+ new TestHadoopDelegationTokenIdentifier(issueDate);
assertEquals(
issueDate,
@@ -203,7 +208,7 @@ class HadoopFSDelegationTokenProviderITCase {
Clock constantClock = Clock.fixed(ofEpochMilli(NOW),
ZoneId.systemDefault());
long issueDate = -1;
AbstractDelegationTokenIdentifier tokenIdentifier =
- new TestDelegationTokenIdentifier(issueDate);
+ new TestHadoopDelegationTokenIdentifier(issueDate);
assertEquals(
NOW,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
similarity index 98%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
index 861e8f536b9..f40baab5c38 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
@@ -16,10 +16,11 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.hadoop.security.UserGroupInformation;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
similarity index 99%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderITCase.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
index 9dd17edd95c..a7dde7c1c2d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.configuration.Configuration;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenIdentifier.java
similarity index 86%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenIdentifier.java
index 0d041d2b2a3..66db6751a6c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenIdentifier.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@@ -29,16 +29,16 @@ import java.io.IOException;
* Example test implementation of {@link AbstractDelegationTokenIdentifier}
which is used in
* integration tests.
*/
-public class TestDelegationTokenIdentifier extends
AbstractDelegationTokenIdentifier {
+public class TestHadoopDelegationTokenIdentifier extends
AbstractDelegationTokenIdentifier {
private static final Text tokenKind = new Text("TEST_TOKEN_KIND");
private long issueDate;
// This is needed for service loader
- public TestDelegationTokenIdentifier() {}
+ public TestHadoopDelegationTokenIdentifier() {}
- public TestDelegationTokenIdentifier(long issueDate) {
+ public TestHadoopDelegationTokenIdentifier(long issueDate) {
this.issueDate = issueDate;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestHadoopDelegationTokenProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
similarity index 87%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestHadoopDelegationTokenProvider.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
index f78e1524efb..7b692ad8da2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestHadoopDelegationTokenProvider.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
import org.apache.flink.configuration.Configuration;
@@ -24,7 +24,10 @@ import org.apache.hadoop.security.Credentials;
import java.util.Optional;
-/** An example implementation of {@link HadoopDelegationTokenProvider} which
does nothing. */
+/**
+ * An example implementation of {@link
+ *
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider}
which does nothing.
+ */
public class TestHadoopDelegationTokenProvider implements
HadoopDelegationTokenProvider {
@Override
diff --git
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
similarity index 81%
rename from
flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
rename to
flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
index 55922e83aaf..74e3982f7bc 100644
---
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
+++
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
@@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.runtime.security.token.TestHadoopDelegationTokenProvider
-org.apache.flink.runtime.security.token.ExceptionThrowingHadoopDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.ExceptionThrowingHadoopDelegationTokenProvider
diff --git
a/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
index 59e6d84d595..611dd95044e 100644
---
a/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
+++
b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.runtime.security.token.TestDelegationTokenIdentifier
+org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenIdentifier
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 3bfa9a8d843..235400fe7ce 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -52,10 +52,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
-import org.apache.flink.runtime.security.token.DelegationTokenConverter;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosLoginProvider;
+import
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManager;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkException;
@@ -1300,7 +1300,7 @@ public class YarnClusterDescriptor implements
ClusterDescriptor<ApplicationId> {
new KerberosDelegationTokenManager(flinkConfiguration, null,
null);
delegationTokenManager.obtainDelegationTokens(credentials);
- ByteBuffer tokens =
ByteBuffer.wrap(DelegationTokenConverter.serialize(credentials));
+ ByteBuffer tokens =
ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials));
containerLaunchContext.setTokens(tokens);
LOG.info("Delegation tokens added to the AM container.");