This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f3b63b7c962 [FLINK-31656][runtime][security] Obtain delegation tokens
early to support external file system usage in blob server
f3b63b7c962 is described below
commit f3b63b7c962d9548d739698e00c4b5dde9a715a5
Author: Gabor Somogyi <[email protected]>
AuthorDate: Wed Mar 29 11:53:28 2023 +0200
[FLINK-31656][runtime][security] Obtain delegation tokens early to support
external file system usage in blob server
---
.../flink/runtime/blob/FileSystemBlobStore.java | 17 ++++++++---
.../runtime/entrypoint/ClusterEntrypoint.java | 15 ++++++----
.../highavailability/FileSystemJobResultStore.java | 25 +++++++++++++---
.../flink/runtime/minicluster/MiniCluster.java | 26 +++++++++--------
.../token/DefaultDelegationTokenManager.java | 16 ++++++++++-
.../security/token/DelegationTokenManager.java | 10 ++++++-
.../security/token/NoOpDelegationTokenManager.java | 5 ++++
.../runtime/blob/FileSystemBlobStoreTest.java | 3 +-
.../runtime/entrypoint/ClusterEntrypointTest.java | 33 ++++++++++++++++++++++
...FileSystemJobResultStoreFileOperationsTest.java | 3 ++
.../ExceptionThrowingDelegationTokenReceiver.java | 4 +++
11 files changed, 129 insertions(+), 28 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index e8b53cc9170..0e902c9144a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -49,6 +49,8 @@ public class FileSystemBlobStore implements BlobStoreService {
/** The file system in which blobs are stored. */
private final FileSystem fileSystem;
+ private volatile boolean basePathCreated;
+
/** The base path of the blob store. */
private final String basePath;
@@ -57,22 +59,29 @@ public class FileSystemBlobStore implements
BlobStoreService {
public FileSystemBlobStore(FileSystem fileSystem, String storagePath)
throws IOException {
this.fileSystem = checkNotNull(fileSystem);
+ this.basePathCreated = false;
this.basePath = checkNotNull(storagePath) + "/" + BLOB_PATH_NAME;
+ }
- LOG.info("Creating highly available BLOB storage directory at {}",
basePath);
-
- fileSystem.mkdirs(new Path(basePath));
- LOG.debug("Created highly available BLOB storage directory at {}",
basePath);
+ private void createBasePathIfNeeded() throws IOException {
+ if (!basePathCreated) {
+ LOG.info("Creating highly available BLOB storage directory at {}",
basePath);
+ fileSystem.mkdirs(new Path(basePath));
+ LOG.debug("Created highly available BLOB storage directory at {}",
basePath);
+ basePathCreated = true;
+ }
}
// - Put ------------------------------------------------------------------
@Override
public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws
IOException {
+ createBasePathIfNeeded();
return put(localFile, BlobUtils.getStorageLocationPath(basePath,
jobId, blobKey));
}
private boolean put(File fromFile, String toBlobPath) throws IOException {
+ createBasePathIfNeeded();
try (FSDataOutputStream os =
fileSystem.create(new Path(toBlobPath),
FileSystem.WriteMode.OVERWRITE)) {
LOG.debug("Copying from {} to {}.", fromFile, toBlobPath);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 094a8c85a2b..9aebd8bec30 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -379,6 +379,15 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
+ delegationTokenManager =
+ DefaultDelegationTokenManagerFactory.create(
+ configuration,
+ pluginManager,
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor);
+ // Obtaining delegation tokens and propagating them to the local
JVM receivers in a
+ // one-time fashion is required because BlobServer may connect to
external file systems
+ delegationTokenManager.obtainDelegationTokens();
haServices = createHaServices(configuration, ioExecutor,
rpcSystem);
blobServer =
BlobUtils.createBlobServer(
@@ -388,12 +397,6 @@ public abstract class ClusterEntrypoint implements
AutoCloseableAsync, FatalErro
blobServer.start();
configuration.setString(BlobServerOptions.PORT,
String.valueOf(blobServer.getPort()));
heartbeatServices = createHeartbeatServices(configuration);
- delegationTokenManager =
- DefaultDelegationTokenManagerFactory.create(
- configuration,
- pluginManager,
- commonRpcService.getScheduledExecutor(),
- ioExecutor);
metricRegistry = createMetricRegistry(configuration,
pluginManager, rpcSystem);
final RpcService metricQueryServiceRpcService =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index c3fc225596d..36284366e31 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -40,6 +40,9 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
@@ -54,6 +57,8 @@ import static
org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
*/
public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore
{
+ private static final Logger LOG =
LoggerFactory.getLogger(FileSystemJobResultStore.class);
+
@VisibleForTesting static final String FILE_EXTENSION = ".json";
@VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" +
FILE_EXTENSION;
@@ -71,18 +76,17 @@ public class FileSystemJobResultStore extends
AbstractThreadsafeJobResultStore {
private final FileSystem fileSystem;
+ private volatile boolean basePathCreated;
+
private final Path basePath;
private final boolean deleteOnCommit;
@VisibleForTesting
- FileSystemJobResultStore(FileSystem fileSystem, Path basePath, boolean
deleteOnCommit)
- throws IOException {
+ FileSystemJobResultStore(FileSystem fileSystem, Path basePath, boolean
deleteOnCommit) {
this.fileSystem = fileSystem;
this.basePath = basePath;
this.deleteOnCommit = deleteOnCommit;
-
- this.fileSystem.mkdirs(this.basePath);
}
public static FileSystemJobResultStore fromConfiguration(Configuration
config)
@@ -104,6 +108,15 @@ public class FileSystemJobResultStore extends
AbstractThreadsafeJobResultStore {
return new FileSystemJobResultStore(basePath.getFileSystem(),
basePath, deleteOnCommit);
}
+ private void createBasePathIfNeeded() throws IOException {
+ if (!basePathCreated) {
+ LOG.info("Creating highly available job result storage directory
at {}", basePath);
+ fileSystem.mkdirs(basePath);
+ LOG.info("Created highly available job result storage directory at
{}", basePath);
+ basePathCreated = true;
+ }
+ }
+
public static String createDefaultJobResultStorePath(String baseDir,
String clusterId) {
return baseDir + "/job-result-store/" + clusterId;
}
@@ -137,6 +150,8 @@ public class FileSystemJobResultStore extends
AbstractThreadsafeJobResultStore {
@Override
public void createDirtyResultInternal(JobResultEntry jobResultEntry)
throws IOException {
+ createBasePathIfNeeded();
+
final Path path = constructDirtyPath(jobResultEntry.getJobId());
try (OutputStream os = fileSystem.create(path,
FileSystem.WriteMode.NO_OVERWRITE)) {
mapper.writeValue(
@@ -177,6 +192,8 @@ public class FileSystemJobResultStore extends
AbstractThreadsafeJobResultStore {
@Override
public Set<JobResult> getDirtyResultsInternal() throws IOException {
+ createBasePathIfNeeded();
+
final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
Preconditions.checkState(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index ad708d96605..1cb0a52213f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -418,6 +418,21 @@ public class MiniCluster implements AutoCloseableAsync {
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("mini-cluster-io"));
+ delegationTokenManager =
+ DefaultDelegationTokenManagerFactory.create(
+ configuration,
+ miniClusterConfiguration.getPluginManager(),
+ commonRpcService.getScheduledExecutor(),
+ ioExecutor);
+ // Obtaining delegation tokens and propagating them to the
local JVM receivers in a
+ // one-time fashion is required because BlobServer may connect
to external file
+ // systems
+ delegationTokenManager.obtainDelegationTokens();
+
+ delegationTokenReceiverRepository =
+ new DelegationTokenReceiverRepository(
+ configuration,
miniClusterConfiguration.getPluginManager());
+
haServicesFactory =
createHighAvailabilityServicesFactory(configuration);
haServices = createHighAvailabilityServices(configuration,
ioExecutor);
@@ -431,17 +446,6 @@ public class MiniCluster implements AutoCloseableAsync {
heartbeatServices =
HeartbeatServices.fromConfiguration(configuration);
- delegationTokenManager =
- DefaultDelegationTokenManagerFactory.create(
- configuration,
- miniClusterConfiguration.getPluginManager(),
- commonRpcService.getScheduledExecutor(),
- ioExecutor);
-
- delegationTokenReceiverRepository =
- new DelegationTokenReceiverRepository(
- configuration,
miniClusterConfiguration.getPluginManager());
-
blobCacheService =
BlobUtils.createBlobCacheService(
configuration,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
index 9e25e47dcc1..c144037d223 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManager.java
@@ -213,6 +213,20 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
LOG.info("Delegation tokens obtained successfully");
}
+ @Override
+ public void obtainDelegationTokens() throws Exception {
+ LOG.info("Obtaining delegation tokens");
+ DelegationTokenContainer container = new DelegationTokenContainer();
+ obtainDelegationTokensAndGetNextRenewal(container);
+ LOG.info("Delegation tokens obtained successfully");
+
+ if (container.hasTokens()) {
+ delegationTokenReceiverRepository.onNewTokensObtained(container);
+ } else {
+ LOG.warn("No tokens obtained so skipping notifications");
+ }
+ }
+
protected Optional<Long> obtainDelegationTokensAndGetNextRenewal(
DelegationTokenContainer container) {
return delegationTokenProviders.values().stream()
@@ -281,7 +295,7 @@ public class DefaultDelegationTokenManager implements
DelegationTokenManager {
listener.onNewTokensObtained(InstantiationUtil.serializeObject(container));
LOG.info("Listener notified successfully");
} else {
- LOG.warn("No tokens obtained so skipping listener
notification");
+ LOG.warn("No tokens obtained so skipping notifications");
}
if (nextRenewal.isPresent()) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
index 57f53c75ccc..3452cf5dcce 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/DelegationTokenManager.java
@@ -45,9 +45,17 @@ public interface DelegationTokenManager {
*/
void obtainDelegationTokens(DelegationTokenContainer container) throws
Exception;
+ /**
+ * Obtains new tokens in a one-time fashion and automatically distributes
them to all local JVM
+ * receivers.
+ */
+ void obtainDelegationTokens() throws Exception;
+
/**
* Creates a re-occurring task which obtains new tokens and automatically
distributes them to
- * task managers.
+ * all receivers (in local JVM as well as in registered task managers
too). Task manager
+ * distribution must be implemented in the listener logic in order to keep
the manager logic
+ * clean.
*/
void start(Listener listener) throws Exception;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
index cab68f825e6..655ced1c74d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/token/NoOpDelegationTokenManager.java
@@ -38,6 +38,11 @@ public class NoOpDelegationTokenManager implements
DelegationTokenManager {
LOG.debug("obtainDelegationTokens");
}
+ @Override
+ public void obtainDelegationTokens() {
+ LOG.debug("obtainDelegationTokens");
+ }
+
@Override
public void start(Listener listener) {
LOG.debug("start");
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
index 479f58475fc..c73c403712a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/FileSystemBlobStoreTest.java
@@ -74,7 +74,8 @@ class FileSystemBlobStoreTest {
final JobID jobId = new JobID();
final BlobKey blobKey = createPermanentBlobKeyFromFile(temporaryFile);
- assertThat(getBlobDirectoryPath()).isEmptyDirectory();
+ // Blob store operations are creating the base directory on-the-fly
+ assertThat(getBlobDirectoryPath()).doesNotExist();
final boolean successfullyWritten =
testInstance.put(temporaryFile.toFile(), jobId, blobKey);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
index 5b3926996df..56923ff2871 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointTest.java
@@ -44,6 +44,8 @@ import
org.apache.flink.runtime.rest.SessionRestEndpointFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
+import
org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenProvider;
+import
org.apache.flink.runtime.security.token.ExceptionThrowingDelegationTokenReceiver;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.runtime.testutils.TestingClusterEntrypointProcess;
import org.apache.flink.runtime.util.SignalHandler;
@@ -53,6 +55,7 @@ import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -89,6 +92,14 @@ public class ClusterEntrypointTest extends TestLogger {
@Before
public void before() {
flinkConfig = new Configuration();
+ ExceptionThrowingDelegationTokenProvider.reset();
+ ExceptionThrowingDelegationTokenReceiver.reset();
+ }
+
+ @After
+ public void after() {
+ ExceptionThrowingDelegationTokenProvider.reset();
+ ExceptionThrowingDelegationTokenReceiver.reset();
}
@Test(expected = IllegalConfigurationException.class)
@@ -98,6 +109,28 @@ public class ClusterEntrypointTest extends TestLogger {
fail("Entrypoint initialization is supposed to fail");
}
+ @Test
+ public void testClusterStartShouldObtainTokens() throws Exception {
+ ExceptionThrowingDelegationTokenProvider.addToken.set(true);
+ final HighAvailabilityServices testingHaService =
+ new TestingHighAvailabilityServicesBuilder().build();
+ final TestingEntryPoint testingEntryPoint =
+ new TestingEntryPoint.Builder()
+ .setConfiguration(flinkConfig)
+ .setHighAvailabilityServices(testingHaService)
+ .build();
+
+ final CompletableFuture<ApplicationStatus> appStatusFuture =
+ startClusterEntrypoint(testingEntryPoint);
+
+ testingEntryPoint.closeAsync();
+ assertThat(
+ appStatusFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
+ is(ApplicationStatus.UNKNOWN));
+ assertThat(
+
ExceptionThrowingDelegationTokenReceiver.onNewTokensObtainedCallCount.get(),
is(1));
+ }
+
@Test
public void testCloseAsyncShouldNotCleanUpHAData() throws Exception {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
index cf7c052d02b..fb7a245d5fa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStoreFileOperationsTest.java
@@ -117,6 +117,9 @@ public class FileSystemJobResultStoreFileOperationsTest {
fileSystemJobResultStore =
new FileSystemJobResultStore(basePath.getFileSystem(),
basePath, false);
+ // Result store operations are creating the base directory on-the-fly
+ assertThat(emptyBaseDirectory).doesNotExist();
+ fileSystemJobResultStore.createDirtyResult(DUMMY_JOB_RESULT_ENTRY);
assertThat(emptyBaseDirectory).exists().isDirectory();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
index bf0329f9595..4948c859a88 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/ExceptionThrowingDelegationTokenReceiver.java
@@ -32,11 +32,14 @@ public class ExceptionThrowingDelegationTokenReceiver
implements DelegationToken
ThreadLocal.withInitial(() -> Boolean.FALSE);
public static volatile ThreadLocal<Boolean> constructed =
ThreadLocal.withInitial(() -> Boolean.FALSE);
+ public static volatile ThreadLocal<Integer> onNewTokensObtainedCallCount =
+ ThreadLocal.withInitial(() -> 0);
public static void reset() {
throwInInit.set(false);
throwInUsage.set(false);
constructed.set(false);
+ onNewTokensObtainedCallCount.set(0);
}
public ExceptionThrowingDelegationTokenReceiver() {
@@ -60,5 +63,6 @@ public class ExceptionThrowingDelegationTokenReceiver
implements DelegationToken
if (throwInUsage.get()) {
throw new IllegalArgumentException();
}
+ onNewTokensObtainedCallCount.set(onNewTokensObtainedCallCount.get() +
1);
}
}