This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ba336656b6 [FLINK-30402][security][runtime] Separate token framework 
generic and hadoop specific parts
4ba336656b6 is described below

commit 4ba336656b6a24c99b7e4e50ef9772fd58d79792
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Dec 14 16:28:48 2022 +0100

    [FLINK-30402][security][runtime] Separate token framework generic and 
hadoop specific parts
---
 .../runtime/entrypoint/ClusterEntrypoint.java      |  2 +-
 .../flink/runtime/minicluster/MiniCluster.java     |  2 +-
 .../runtime/security/modules/HadoopModule.java     |  2 +-
 .../{ => hadoop}/HBaseDelegationTokenProvider.java |  2 +-
 .../HadoopDelegationTokenConverter.java}           |  4 +--
 .../HadoopDelegationTokenProvider.java             |  3 ++-
 .../HadoopDelegationTokenUpdater.java}             | 10 +++----
 .../HadoopFSDelegationTokenProvider.java           |  2 +-
 .../KerberosDelegationTokenManager.java            | 11 +++++---
 .../KerberosDelegationTokenManagerFactory.java     |  4 ++-
 .../token/{ => hadoop}/KerberosLoginProvider.java  |  2 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  7 ++---
 ...ity.token.hadoop.HadoopDelegationTokenProvider} |  4 +--
 ...ptionThrowingHadoopDelegationTokenProvider.java |  2 +-
 .../HadoopDelegationTokenConverterTest.java}       | 10 +++----
 .../HadoopDelegationTokenUpdaterITCase.java}       | 12 ++++-----
 .../HadoopFSDelegationTokenProviderITCase.java     | 31 +++++++++++++---------
 .../KerberosDelegationTokenManagerITCase.java      |  3 ++-
 .../{ => hadoop}/KerberosLoginProviderITCase.java  |  2 +-
 .../TestHadoopDelegationTokenIdentifier.java}      |  8 +++---
 .../TestHadoopDelegationTokenProvider.java         |  7 +++--
 ...ity.token.hadoop.HadoopDelegationTokenProvider} |  4 +--
 ...rg.apache.hadoop.security.token.TokenIdentifier |  2 +-
 .../apache/flink/yarn/YarnClusterDescriptor.java   |  8 +++---
 24 files changed, 80 insertions(+), 64 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index ce67ccd3010..243dbe059a6 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -66,7 +66,7 @@ import 
org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.security.contexts.SecurityContext;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import 
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
+import 
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import 
org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index bab16dd21f8..a4c2fd06968 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -90,7 +90,7 @@ import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import 
org.apache.flink.runtime.security.token.KerberosDelegationTokenManagerFactory;
+import 
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManagerFactory;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
index 6e77341e755..2ebcdb6f1bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.security.modules;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.hadoop.HadoopUserUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
-import org.apache.flink.runtime.security.token.KerberosLoginProvider;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
similarity index 99%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
index 64b2fba8e8e..4441267f355 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HBaseDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HBaseDelegationTokenProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.Configuration;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenConverter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverter.java
similarity index 94%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenConverter.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverter.java
index ad1927b43f3..a3829ce8cd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenConverter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
 
@@ -29,7 +29,7 @@ import java.io.IOException;
 
 /** Delegation token serializer and deserializer functionality. */
 @Internal
-public class DelegationTokenConverter {
+public class HadoopDelegationTokenConverter {
     /** Serializes delegation tokens. */
     public static byte[] serialize(Credentials credentials) throws IOException 
{
         try (DataOutputBuffer dob = new DataOutputBuffer()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
similarity index 94%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopDelegationTokenProvider.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
index 834a9da2259..061f32c6d6d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenProvider.java
@@ -16,10 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenUpdater.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
similarity index 85%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenUpdater.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
index 83cc174c6cf..a4528cfad54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenUpdater.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdater.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
 
@@ -29,18 +29,18 @@ import java.io.IOException;
 
 /** Delegation token updater functionality. */
 @Internal
-public final class DelegationTokenUpdater {
+public final class HadoopDelegationTokenUpdater {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(DelegationTokenUpdater.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(HadoopDelegationTokenUpdater.class);
 
-    private DelegationTokenUpdater() {}
+    private HadoopDelegationTokenUpdater() {}
 
     /** Updates delegation tokens for the current user. */
     public static void addCurrentUserCredentials(byte[] credentialsBytes) 
throws IOException {
         if (credentialsBytes == null || credentialsBytes.length == 0) {
             throw new IllegalArgumentException("Illegal credentials tried to 
be set");
         }
-        Credentials credentials = 
DelegationTokenConverter.deserialize(credentialsBytes);
+        Credentials credentials = 
HadoopDelegationTokenConverter.deserialize(credentialsBytes);
         LOG.info("Updating delegation tokens for current user");
         dumpAllTokens(credentials);
         UserGroupInformation.getCurrentUser().addCredentials(credentials);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
similarity index 99%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
index ad895bcee25..81d25eb0f51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
similarity index 96%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
index 6b5ebed7aaa..ff4c95ca4af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManager.java
@@ -16,11 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.security.token.DelegationTokenListener;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
@@ -189,7 +191,7 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
                         .flatMap(nr -> 
nr.map(Stream::of).orElseGet(Stream::empty))
                         .min(Long::compare);
 
-        DelegationTokenUpdater.dumpAllTokens(credentials);
+        HadoopDelegationTokenUpdater.dumpAllTokens(credentials);
 
         return nextRenewal;
     }
@@ -269,11 +271,12 @@ public class KerberosDelegationTokenManager implements 
DelegationTokenManager {
             Optional<Long> nextRenewal = 
obtainDelegationTokensAndGetNextRenewal(credentials);
 
             if (credentials.numberOfTokens() > 0) {
-                byte[] credentialsBytes = 
DelegationTokenConverter.serialize(credentials);
+                byte[] credentialsBytes = 
HadoopDelegationTokenConverter.serialize(credentials);
 
-                
DelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
+                
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
 
                 LOG.info("Notifying listener about new tokens");
+                checkNotNull(delegationTokenListener, "Listener must not be 
null");
                 delegationTokenListener.onNewTokensObtained(credentialsBytes);
                 LOG.info("Listener notified successfully");
             } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
similarity index 93%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
index f16ce767803..3a812bfc769 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerFactory.java
@@ -16,12 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.hadoop.HadoopDependency;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
 import org.slf4j.Logger;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
similarity index 98%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
index c2ca529a0fb..4851a4f470c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/KerberosLoginProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 6e2a3c46bf6..24384585475 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -94,7 +94,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
-import org.apache.flink.runtime.security.token.DelegationTokenUpdater;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenUpdater;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
@@ -1340,7 +1340,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         }
 
         try {
-            DelegationTokenUpdater.addCurrentUserCredentials(tokens);
+            HadoopDelegationTokenUpdater.addCurrentUserCredentials(tokens);
             return CompletableFuture.completedFuture(Acknowledge.get());
         } catch (Throwable t) {
             log.error("Could not update delegation tokens.", t);
@@ -2379,7 +2379,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             if (success.getInitialTokens() != null) {
                 try {
                     log.info("Receive initial delegation tokens from resource 
manager");
-                    
DelegationTokenUpdater.addCurrentUserCredentials(success.getInitialTokens());
+                    HadoopDelegationTokenUpdater.addCurrentUserCredentials(
+                            success.getInitialTokens());
                 } catch (Throwable t) {
                     log.error("Could not update delegation tokens.", t);
                     ExceptionUtils.rethrowIfFatalError(t);
diff --git 
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
 
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
similarity index 83%
rename from 
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
rename to 
flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
index 0d58510a34a..84b369d4efb 100644
--- 
a/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
+++ 
b/flink-runtime/src/main/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.runtime.security.token.HadoopFSDelegationTokenProvider
-org.apache.flink.runtime.security.token.HBaseDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.HadoopFSDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.HBaseDelegationTokenProvider
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingHadoopDelegationTokenProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
similarity index 97%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingHadoopDelegationTokenProvider.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
index ded5cf6c2ae..234b0b68b13 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingHadoopDelegationTokenProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/ExceptionThrowingHadoopDelegationTokenProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.configuration.Configuration;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenConverterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
similarity index 84%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenConverterTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
index 013cccec269..8ab3625df6f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenConverterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenConverterTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -28,8 +28,8 @@ import java.io.IOException;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
-/** Test for {@link DelegationTokenConverter}. */
-public class DelegationTokenConverterTest {
+/** Test for {@link HadoopDelegationTokenConverter}. */
+public class HadoopDelegationTokenConverterTest {
 
     @Test
     public void testRoundTrip() throws IOException {
@@ -39,9 +39,9 @@ public class DelegationTokenConverterTest {
         credentials.addToken(
                 tokenService, new Token<>(new byte[4], new byte[4], tokenKind, 
tokenService));
 
-        byte[] credentialsBytes = 
DelegationTokenConverter.serialize(credentials);
+        byte[] credentialsBytes = 
HadoopDelegationTokenConverter.serialize(credentials);
         Credentials deserializedCredentials =
-                DelegationTokenConverter.deserialize(credentialsBytes);
+                HadoopDelegationTokenConverter.deserialize(credentialsBytes);
 
         assertEquals(1, credentials.getAllTokens().size());
         assertEquals(1, deserializedCredentials.getAllTokens().size());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenUpdaterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
similarity index 89%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenUpdaterITCase.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
index c30111575ea..70d1a1cfab4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenUpdaterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopDelegationTokenUpdaterITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.io.Text;
@@ -36,8 +36,8 @@ import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-/** Test for {@link DelegationTokenConverter}. */
-public class DelegationTokenUpdaterITCase {
+/** Test for {@link HadoopDelegationTokenConverter}. */
+public class HadoopDelegationTokenUpdaterITCase {
 
     @Test
     public void 
addCurrentUserCredentialsShouldThrowExceptionWhenNullCredentials() {
@@ -58,7 +58,7 @@ public class DelegationTokenUpdaterITCase {
                     assertThrows(
                             IllegalArgumentException.class,
                             () ->
-                                    
DelegationTokenUpdater.addCurrentUserCredentials(
+                                    
HadoopDelegationTokenUpdater.addCurrentUserCredentials(
                                             credentialsBytes));
             assertTrue(e.getMessage().contains("Illegal credentials"));
         }
@@ -72,13 +72,13 @@ public class DelegationTokenUpdaterITCase {
         credentials.addToken(
                 tokenService, new Token<>(new byte[4], new byte[4], tokenKind, 
tokenService));
 
-        byte[] credentialsBytes = 
DelegationTokenConverter.serialize(credentials);
+        byte[] credentialsBytes = 
HadoopDelegationTokenConverter.serialize(credentials);
 
         try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
             UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
             
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
 
-            DelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
+            
HadoopDelegationTokenUpdater.addCurrentUserCredentials(credentialsBytes);
             ArgumentCaptor<Credentials> argumentCaptor = 
ArgumentCaptor.forClass(Credentials.class);
             verify(userGroupInformation, 
times(1)).addCredentials(argumentCaptor.capture());
             assertTrue(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
similarity index 87%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
index f5e2db75100..5f23695b973 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/HadoopFSDelegationTokenProviderITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/HadoopFSDelegationTokenProviderITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,17 +47,20 @@ class HadoopFSDelegationTokenProviderITCase {
     final Text tokenService1 = new Text("TEST_TOKEN_SERVICE1");
     final Text tokenService2 = new Text("TEST_TOKEN_SERVICE2");
 
-    private class TestDelegationToken extends 
Token<TestDelegationTokenIdentifier> {
+    private class TestDelegationToken extends 
Token<TestHadoopDelegationTokenIdentifier> {
 
         private long newExpiration;
 
         public TestDelegationToken(
-                Text tokenService, TestDelegationTokenIdentifier identifier, 
long newExpiration) {
+                Text tokenService,
+                TestHadoopDelegationTokenIdentifier identifier,
+                long newExpiration) {
             super(identifier.getBytes(), new byte[4], identifier.getKind(), 
tokenService);
             this.newExpiration = newExpiration;
         }
 
-        public TestDelegationToken(Text tokenService, 
TestDelegationTokenIdentifier identifier) {
+        public TestDelegationToken(
+                Text tokenService, TestHadoopDelegationTokenIdentifier 
identifier) {
             this(tokenService, identifier, 0L);
         }
 
@@ -112,14 +115,14 @@ class HadoopFSDelegationTokenProviderITCase {
                             String renewer,
                             Set<FileSystem> fileSystemsToAccess,
                             Credentials credentials) {
-                        TestDelegationTokenIdentifier tokenIdentifier1 =
-                                new TestDelegationTokenIdentifier(NOW);
+                        TestHadoopDelegationTokenIdentifier tokenIdentifier1 =
+                                new TestHadoopDelegationTokenIdentifier(NOW);
                         credentials.addToken(
                                 tokenService1,
                                 new TestDelegationToken(tokenService1, 
tokenIdentifier1, NOW + 1));
 
-                        TestDelegationTokenIdentifier tokenIdentifier2 =
-                                new TestDelegationTokenIdentifier(NOW);
+                        TestHadoopDelegationTokenIdentifier tokenIdentifier2 =
+                                new TestHadoopDelegationTokenIdentifier(NOW);
                         credentials.addToken(
                                 tokenService2,
                                 new TestDelegationToken(tokenService2, 
tokenIdentifier2, NOW + 2));
@@ -155,10 +158,12 @@ class HadoopFSDelegationTokenProviderITCase {
         HadoopFSDelegationTokenProvider provider = new 
HadoopFSDelegationTokenProvider();
         Clock constantClock = Clock.fixed(ofEpochMilli(NOW), 
ZoneId.systemDefault());
         Credentials credentials = new Credentials();
-        TestDelegationTokenIdentifier tokenIdentifier1 = new 
TestDelegationTokenIdentifier(NOW);
+        TestHadoopDelegationTokenIdentifier tokenIdentifier1 =
+                new TestHadoopDelegationTokenIdentifier(NOW);
         credentials.addToken(
                 tokenService1, new TestDelegationToken(tokenService1, 
tokenIdentifier1));
-        TestDelegationTokenIdentifier tokenIdentifier2 = new 
TestDelegationTokenIdentifier(NOW + 1);
+        TestHadoopDelegationTokenIdentifier tokenIdentifier2 =
+                new TestHadoopDelegationTokenIdentifier(NOW + 1);
         credentials.addToken(
                 tokenService2, new TestDelegationToken(tokenService2, 
tokenIdentifier2));
 
@@ -173,7 +178,7 @@ class HadoopFSDelegationTokenProviderITCase {
         Clock constantClock = Clock.fixed(ofEpochMilli(NOW), 
ZoneId.systemDefault());
         long issueDate = NOW + 1;
         AbstractDelegationTokenIdentifier tokenIdentifier =
-                new TestDelegationTokenIdentifier(issueDate);
+                new TestHadoopDelegationTokenIdentifier(issueDate);
 
         assertEquals(
                 issueDate,
@@ -188,7 +193,7 @@ class HadoopFSDelegationTokenProviderITCase {
         Clock constantClock = Clock.fixed(ofEpochMilli(NOW), 
ZoneId.systemDefault());
         long issueDate = NOW - 1;
         AbstractDelegationTokenIdentifier tokenIdentifier =
-                new TestDelegationTokenIdentifier(issueDate);
+                new TestHadoopDelegationTokenIdentifier(issueDate);
 
         assertEquals(
                 issueDate,
@@ -203,7 +208,7 @@ class HadoopFSDelegationTokenProviderITCase {
         Clock constantClock = Clock.fixed(ofEpochMilli(NOW), 
ZoneId.systemDefault());
         long issueDate = -1;
         AbstractDelegationTokenIdentifier tokenIdentifier =
-                new TestDelegationTokenIdentifier(issueDate);
+                new TestHadoopDelegationTokenIdentifier(issueDate);
 
         assertEquals(
                 NOW,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
similarity index 98%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
index 861e8f536b9..f40baab5c38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosDelegationTokenManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosDelegationTokenManagerITCase.java
@@ -16,10 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+import org.apache.flink.runtime.security.token.DelegationTokenManager;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
 import org.apache.hadoop.security.UserGroupInformation;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
similarity index 99%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderITCase.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
index 9dd17edd95c..a7dde7c1c2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/KerberosLoginProviderITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/KerberosLoginProviderITCase.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.configuration.Configuration;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenIdentifier.java
similarity index 86%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenIdentifier.java
index 0d041d2b2a3..66db6751a6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestDelegationTokenIdentifier.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenIdentifier.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.hadoop.io.Text;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@@ -29,16 +29,16 @@ import java.io.IOException;
  * Example test implementation of {@link AbstractDelegationTokenIdentifier} 
which is used in
  * integration tests.
  */
-public class TestDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
+public class TestHadoopDelegationTokenIdentifier extends 
AbstractDelegationTokenIdentifier {
 
     private static final Text tokenKind = new Text("TEST_TOKEN_KIND");
 
     private long issueDate;
 
     // This is needed for service loader
-    public TestDelegationTokenIdentifier() {}
+    public TestHadoopDelegationTokenIdentifier() {}
 
-    public TestDelegationTokenIdentifier(long issueDate) {
+    public TestHadoopDelegationTokenIdentifier(long issueDate) {
         this.issueDate = issueDate;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestHadoopDelegationTokenProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
similarity index 87%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestHadoopDelegationTokenProvider.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
index f78e1524efb..7b692ad8da2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/TestHadoopDelegationTokenProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/hadoop/TestHadoopDelegationTokenProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.security.token;
+package org.apache.flink.runtime.security.token.hadoop;
 
 import org.apache.flink.configuration.Configuration;
 
@@ -24,7 +24,10 @@ import org.apache.hadoop.security.Credentials;
 
 import java.util.Optional;
 
-/** An example implementation of {@link HadoopDelegationTokenProvider} which 
does nothing. */
+/**
+ * An example implementation of {@link
+ * 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider} 
which does nothing.
+ */
 public class TestHadoopDelegationTokenProvider implements 
HadoopDelegationTokenProvider {
 
     @Override
diff --git 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
similarity index 81%
rename from 
flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
rename to 
flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
index 55922e83aaf..74e3982f7bc 100644
--- 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.HadoopDelegationTokenProvider
+++ 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenProvider
@@ -13,5 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.runtime.security.token.TestHadoopDelegationTokenProvider
-org.apache.flink.runtime.security.token.ExceptionThrowingHadoopDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenProvider
+org.apache.flink.runtime.security.token.hadoop.ExceptionThrowingHadoopDelegationTokenProvider
diff --git 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
index 59e6d84d595..611dd95044e 100644
--- 
a/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
+++ 
b/flink-runtime/src/test/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.runtime.security.token.TestDelegationTokenIdentifier
+org.apache.flink.runtime.security.token.hadoop.TestHadoopDelegationTokenIdentifier
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 3bfa9a8d843..235400fe7ce 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -52,10 +52,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessSpec;
 import org.apache.flink.runtime.jobmanager.JobManagerProcessUtils;
-import org.apache.flink.runtime.security.token.DelegationTokenConverter;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosDelegationTokenManager;
-import org.apache.flink.runtime.security.token.KerberosLoginProvider;
+import 
org.apache.flink.runtime.security.token.hadoop.HadoopDelegationTokenConverter;
+import 
org.apache.flink.runtime.security.token.hadoop.KerberosDelegationTokenManager;
+import org.apache.flink.runtime.security.token.hadoop.KerberosLoginProvider;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkException;
@@ -1300,7 +1300,7 @@ public class YarnClusterDescriptor implements 
ClusterDescriptor<ApplicationId> {
                 new KerberosDelegationTokenManager(flinkConfiguration, null, 
null);
         delegationTokenManager.obtainDelegationTokens(credentials);
 
-        ByteBuffer tokens = 
ByteBuffer.wrap(DelegationTokenConverter.serialize(credentials));
+        ByteBuffer tokens = 
ByteBuffer.wrap(HadoopDelegationTokenConverter.serialize(credentials));
         containerLaunchContext.setTokens(tokens);
 
         LOG.info("Delegation tokens added to the AM container.");


Reply via email to