This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f3b63b7c962 [FLINK-31656][runtime][security] Obtain delegation tokens 
early to support external file system usage in blob server
f3b63b7c962 is described below

commit f3b63b7c962d9548d739698e00c4b5dde9a715a5
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Mar 29 11:53:28 2023 +0200

    [FLINK-31656][runtime][security] Obtain delegation tokens early to support 
external file system usage in blob server
---
 .../flink/runtime/blob/FileSystemBlobStore.java    | 17 ++++++++---
 .../runtime/entrypoint/ClusterEntrypoint.java      | 15 ++++++----
 .../highavailability/FileSystemJobResultStore.java | 25 +++++++++++++---
 .../flink/runtime/minicluster/MiniCluster.java     | 26 +++++++++--------
 .../token/DefaultDelegationTokenManager.java       | 16 ++++++++++-
 .../security/token/DelegationTokenManager.java     | 10 ++++++-
 .../security/token/NoOpDelegationTokenManager.java |  5 ++++
 .../runtime/blob/FileSystemBlobStoreTest.java      |  3 +-
 .../runtime/entrypoint/ClusterEntrypointTest.java  | 33 ++++++++++++++++++++++
 ...FileSystemJobResultStoreFileOperationsTest.java |  3 ++
 .../ExceptionThrowingDelegationTokenReceiver.java  |  4 +++
 11 files changed, 129 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index e8b53cc9170..0e902c9144a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -49,6 +49,8 @@ public class FileSystemBlobStore implements BlobStoreService {
     /** The file system in which blobs are stored. */
     private final FileSystem fileSystem;
 
+    private volatile boolean basePathCreated;
+
     /** The base path of the blob store. */
     private final String basePath;
 
@@ -57,22 +59,29 @@ public class FileSystemBlobStore implements 
BlobStoreService {
 
     public FileSystemBlobStore(FileSystem fileSystem, String storagePath) 
throws IOException {
         this.fileSystem = checkNotNull(fileSystem);
+        this.basePathCreated = false;
         this.basePath = checkNotNull(storagePath) + "/" + BLOB_PATH_NAME;
+    }
 
-        LOG.info("Creating highly available BLOB storage directory at {}", 
basePath);
-
-        fileSystem.mkdirs(new Path(basePath));
-        LOG.debug("Created highly available BLOB storage directory at {}", 
basePath);
+    private void createBasePathIfNeeded() throws IOException {
+        if (!basePathCreated) {
+            LOG.info("Creating highly available BLOB storage directory at {}", 
basePath);
+            fileSystem.mkdirs(new Path(basePath));
+            LOG.debug("Created highly available BLOB storage directory at {}", 
basePath);
+            basePathCreated = true;
+        }
     }
 
     // - Put ------------------------------------------------------------------
 
     @Override
     public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws 
IOException {
+        createBasePathIfNeeded();
         return put(localFile, BlobUtils.getStorageLocationPath(basePath, 
jobId, blobKey));
     }
 
     private boolean put(File fromFile, String toBlobPath) throws IOException {
+        createBasePathIfNeeded();
         try (FSDataOutputStream os =
                 fileSystem.create(new Path(toBlobPath), 
FileSystem.WriteMode.OVERWRITE)) {
             LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 094a8c85a2b..9aebd8bec30 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -379,6 +379,15 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                     Executors.newFixedThreadPool(
                             ClusterEntrypointUtils.getPoolSize(configuration),
                             new ExecutorThreadFactory("cluster-io"));
+            delegationTokenManager =
+                    DefaultDelegationTokenManagerFactory.create(
+                            configuration,
+                            pluginManager,
+                            commonRpcService.getScheduledExecutor(),
+                            ioExecutor);
+            // Obtaining delegation tokens and propagating them to the local 
JVM receivers in a
+            // one-time fashion is required because BlobServer may connect to 
external file systems
+            delegationTokenManager.obtainDelegationTokens();
             haServices = createHaServices(configuration, ioExecutor, 
rpcSystem);
             blobServer =
                     BlobUtils.createBlobServer(
@@ -388,12 +397,6 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
             blobServer.start();
             configuration.setString(BlobServerOptions.PORT, 
String.valueOf(blobServer.getPort()));
             heartbeatServices = createHeartbeatServices(configuration);
-            delegationTokenManager =
-                    DefaultDelegationTokenManagerFactory.create(
-                            configuration,
-                            pluginManager,
-                            commonRpcService.getScheduledExecutor(),
-                            ioExecutor);
             metricRegistry = createMetricRegistry(configuration, 
pluginManager, rpcSystem);
 
             final RpcService metricQueryServiceRpcService =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index c3fc225596d..36284366e31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -40,6 +40,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.HashSet;
@@ -54,6 +57,8 @@ import static 
org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
  */
 public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore 
{
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemJobResultStore.class);
+
     @VisibleForTesting static final String FILE_EXTENSION = ".json";
     @VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" + 
FILE_EXTENSION;
 
@@ -71,18 +76,17 @@ public class FileSystemJobResultStore extends 
AbstractThreadsafeJobResultStore {
 
     private final FileSystem fileSystem;
 
+    private volatile boolean basePathCreated;
+
     private final Path basePath;
 
     private final boolean deleteOnCommit;
 
     @VisibleForTesting
-    FileSystemJobResultStore(FileSystem fileSystem, Path basePath, boolean 
deleteOnCommit)
-            throws IOException {
+    FileSystemJobResultStore(FileSystem fileSystem, Path basePath, boolean 
deleteOnCommit) {
         this.fileSystem = fileSystem;
         this.basePath = basePath;
         this.deleteOnCommit = deleteOnCommit;
-
-        this.fileSystem.mkdirs(this.basePath);
     }
 
     public static FileSystemJobResultStore fromConfiguration(Configuration 
config)
@@ -104,6 +108,15 @@ public class FileSystemJobResultStore extends 
AbstractThreadsafeJobResultStore {
         return new FileSystemJobResultStore(basePath.getFileSystem(), 
basePath, deleteOnCommit);
     }
 
+    private void createBasePathIfNeeded() throws IOException {
+        if (!basePathCreated) {
+            LOG.info("Creating highly available job result storage directory 
at {}", basePath);
+            fileSystem.mkdirs(basePath);
+            LOG.info("Created highly available job result storage directory at 
{}", basePath);
+            basePathCreated = true;
+        }
+    }
+
     public static String createDefaultJobResultStorePath(String baseDir, 
String clusterId) {
         return baseDir + "/job-result-store/" + clusterId;
     }
@@ -137,6 +150,8 @@ public class FileSystemJobResultStore extends 
AbstractThreadsafeJobResultStore {
 
     @Override
     public void createDirtyResultInternal(JobResultEntry jobResultEntry) 
throws IOException {
+        createBasePathIfNeeded();
+
         final Path path = constructDirtyPath(jobResultEntry.getJobId());
         try (OutputStream os = fileSystem.create(path, 
FileSystem.WriteMode.NO_OVERWRITE)) {
             mapper.writeValue(
@@ -177,6 +192,8 @@ public class FileSystemJobResultStore extends 
AbstractThreadsafeJobResultStore {
 
     @Override
     public Set<JobResult> getDirtyResultsInternal() throws IOException {
+        createBasePathIfNeeded();
+
         final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
 
         Preconditions.checkState(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index ad708d96605..1cb0a52213f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -418,6 +418,21 @@ public class MiniCluster implements AutoCloseableAsync {
                                 
ClusterEntrypointUtils.getPoolSize(configuration),
                                 new ExecutorThreadFactory("mini-cluster-io"));
 
+                delegationTokenManager =
+                        DefaultDelegationTokenManagerFactory.create(
+                                configuration,
+                                miniClusterConfiguration.getPluginManager(),
+                                commonRpcService.getScheduledExecutor(),
+                                ioExecutor);
+                // Obtaining delegation tokens and propagating them to the 
local JVM receivers in a
+                // one-time fashion is required because BlobServer may connect 
to external file
+                // systems
+                delegationTokenManager.obtainDelegationTokens();
+
+                delegationTokenReceiverRepository =
+                        new DelegationTokenReceiverRepository(
+                                configuration, 
miniClusterConfiguration.getPluginManager());
+
                 haServicesFactory = 
createHighAvailabilityServicesFactory(configuration);
 
                 haServices = createHighAvailabilityServices(configuration, 
ioExecutor);
@@ -431,17 +446,6 @@ public class MiniCluster implements AutoCloseableAsync {
 
                 heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
-                delegationTokenManager =
-                        DefaultDelegationTokenManagerFactory.create(
-                                configuration,
-                                miniClusterConfiguration.getPluginManager(),
-                                commonRpcService.getScheduledExecutor(),
-                                ioExecutor);
-
-                delegationTokenReceiverRepository =
-                        new DelegationTokenReceiverRepository(
-                                configuration, 
miniClusterConfiguration.getPluginManager());
-
                 blobCacheService =
                         BlobUtils.createBlobCacheService(
                                 configuration,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index 9e25e47dcc1..c144037d223 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -213,6 +213,20 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
         LOG.info("Delegation tokens obtained successfully");
     }
 
+    @Override
+    public void obtainDelegationTokens() throws Exception {
+        LOG.info("Obtaining delegation tokens");
+        DelegationTokenContainer container = new DelegationTokenContainer();
+        obtainDelegationTokensAndGetNextRenewal(container);
+        LOG.info("Delegation tokens obtained successfully");
+
+        if (container.hasTokens()) {
+            delegationTokenReceiverRepository.onNewTokensObtained(container);
+        } else {
+            LOG.warn("No tokens obtained so skipping notifications");
+        }
+    }
+
     protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(
             DelegationTokenContainer container) {
         return delegationTokenProviders.values().stream()
@@ -281,7 +295,7 @@ public class DefaultDelegationTokenManager implements 
DelegationTokenManager {
                 
listener.onNewTokensObtained(InstantiationUtil.serializeObject(container));
                 LOG.info("Listener notified successfully");
             } else {
-                LOG.warn("No tokens obtained so skipping listener 
notification");
+                LOG.warn("No tokens obtained so skipping notifications");
             }
 
             if (nextRenewal.isPresent()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
index 57f53c75ccc..3452cf5dcce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
@@ -45,9 +45,17 @@ public interface DelegationTokenManager {
      */
     void obtainDelegationTokens(DelegationTokenContainer container) throws 
Exception;
 
+    /**
+     * Obtains new tokens in a one-time fashion and automatically distributes 
them to all local JVM
+     * receivers.
+     */
+    void obtainDelegationTokens() throws Exception;
+
     /**
      * Creates a re-occurring task which obtains new tokens and automatically 
distributes them to
-     * task managers.
+     * all receivers (in local JVM as well as in registered task managers 
too). Task manager
+     * distribution must be implemented in the listener logic in order to keep 
the manager logic
+     * clean.
      */
     void start(Listener listener) throws Exception;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
index cab68f825e6..655ced1c74d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
@@ -38,6 +38,11 @@ public class NoOpDelegationTokenManager implements 
DelegationTokenManager {
         LOG.debug("obtainDelegationTokens");
     }
 
+    @Override
+    public void obtainDelegationTokens() {
+        LOG.debug("obtainDelegationTokens");
+    }
+
     @Override
     public void start(Listener listener) {
         LOG.debug("start");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
index 479f58475fc..c73c403712a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
@@ -74,7 +74,8 @@ class FileSystemBlobStoreTest {
 
         final JobID jobId = new JobID();
         final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
-        assertThat(getBlobDirectoryPath()).isEmptyDirectory();
+        // Blob store operations are creating the base directory on-the-fly
+        assertThat(getBlobDirectoryPath()).doesNotExist();
 
         final boolean successfullyWritten =
                 testInstance.put(temporaryFile.toFile(), jobId, blobKey);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 5b3926996df..56923ff2871 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -44,6 +44,8 @@ import 
org.apache.flink.runtime.rest.SessionRestEndpointFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcSystemUtils;
+import 
org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider;
+import 
org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenReceiver;
 import org.apache.flink.runtime.testutils.TestJvmProcess;
 import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
 import org.apache.flink.runtime.util.SignalHandler;
@@ -53,6 +55,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.concurrent.ScheduledExecutor;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -89,6 +92,14 @@ public class ClusterEntrypointTest extends TestLogger {
     @Before
     public void before() {
         flinkConfig = new Configuration();
+        ExceptionThrowingDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenReceiver.reset();
+    }
+
+    @After
+    public void after() {
+        ExceptionThrowingDelegationTokenProvider.reset();
+        ExceptionThrowingDelegationTokenReceiver.reset();
     }
 
     @Test(expected = IllegalConfigurationException.class)
@@ -98,6 +109,28 @@ public class ClusterEntrypointTest extends TestLogger {
         fail("Entrypoint initialization is supposed to fail");
     }
 
+    @Test
+    public void testClusterStartShouldObtainTokens() throws Exception {
+        ExceptionThrowingDelegationTokenProvider.addToken.set(true);
+        final HighAvailabilityServices testingHaService =
+                new TestingHighAvailabilityServicesBuilder().build();
+        final TestingEntryPoint testingEntryPoint =
+                new TestingEntryPoint.Builder()
+                        .setConfiguration(flinkConfig)
+                        .setHighAvailabilityServices(testingHaService)
+                        .build();
+
+        final CompletableFuture<ApplicationStatus> appStatusFuture =
+                startClusterEntrypoint(testingEntryPoint);
+
+        testingEntryPoint.closeAsync();
+        assertThat(
+                appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+                is(ApplicationStatus.UNKNOWN));
+        assertThat(
+                
ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(), 
is(1));
+    }
+
     @Test
     public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
         final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
index cf7c052d02b..fb7a245d5fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
@@ -117,6 +117,9 @@ public class FileSystemJobResultStoreFileOperationsTest {
 
         fileSystemJobResultStore =
                 new FileSystemJobResultStore(basePath.getFileSystem(), 
basePath, false);
+        // Result store operations are creating the base directory on-the-fly
+        assertThat(emptyBaseDirectory).doesNotExist();
+        fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
         assertThat(emptyBaseDirectory).exists().isDirectory();
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
index bf0329f9595..4948c859a88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -32,11 +32,14 @@ public class ExceptionThrowingDelegationTokenReceiver 
implements DelegationToken
             ThreadLocal.withInitial(() -> Boolean.FALSE);
     public static volatile ThreadLocal<Boolean> constructed =
             ThreadLocal.withInitial(() -> Boolean.FALSE);
+    public static volatile ThreadLocal<Integer> onNewTokensObtainedCallCount =
+            ThreadLocal.withInitial(() -> 0);
 
     public static void reset() {
         throwInInit.set(false);
         throwInUsage.set(false);
         constructed.set(false);
+        onNewTokensObtainedCallCount.set(0);
     }
 
     public ExceptionThrowingDelegationTokenReceiver() {
@@ -60,5 +63,6 @@ public class ExceptionThrowingDelegationTokenReceiver 
implements DelegationToken
         if (throwInUsage.get()) {
             throw new IllegalArgumentException();
         }
+        onNewTokensObtainedCallCount.set(onNewTokensObtainedCallCount.get() + 
1);
     }
 }

Reply via email to