tillrohrmann commented on a change in pull request #18164:
URL: https://github.com/apache/flink/pull/18164#discussion_r785873667
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
##########
@@ -99,45 +100,90 @@ private static BlobStoreService
createFileSystemBlobStore(Configuration configur
}
/**
- * Creates a local storage directory for a blob service under the
configuration parameter given
- * by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is
<tt>null</tt> or empty, we will
- * fall back to Flink's temp directories (given by {@link
- * org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one
among them at random.
+ * Creates the {@link BlobServer} from the given configuration, fallback
storage directory and
+ * blob store.
*
- * @param config Flink configuration
- * @return a new local storage directory
- * @throws IOException thrown if the local file storage cannot be created
or is not usable
+ * @param configuration for the BlobServer
+ * @param fallbackStorageDirectory fallback storage directory that is used
if no other directory
+ * has been explicitly configured
+ * @param blobStore blob store to use for this blob server
+ * @return new blob server instance
+ * @throws IOException if we could not create the blob storage directory
*/
- static File initLocalStorageDirectory(Configuration config) throws
IOException {
+ public static BlobServer createBlobServer(
+ Configuration configuration,
+ Reference<File> fallbackStorageDirectory,
+ BlobStore blobStore)
+ throws IOException {
+ final Reference<File> storageDirectory =
+ createBlobStorageDirectory(configuration,
fallbackStorageDirectory);
+ return new BlobServer(configuration, storageDirectory, blobStore);
+ }
- String basePath =
config.getString(BlobServerOptions.STORAGE_DIRECTORY);
+ /**
+ * Creates the {@link BlobCacheService} from the given configuration,
fallback storage
+ * directory, blob view and blob server address.
+ *
+ * @param configuration for the BlobCacheService
+ * @param fallbackStorageDirectory fallback storage directory
+ * @param blobView blob view
+ * @param serverAddress blob server address
+ * @return new blob cache service instance
+ * @throws IOException if we could not create the blob storage directory
+ */
+ public static BlobCacheService createBlobCacheService(
+ Configuration configuration,
+ Reference<File> fallbackStorageDirectory,
+ BlobView blobView,
+ @Nullable InetSocketAddress serverAddress)
+ throws IOException {
+ final Reference<File> storageDirectory =
+ createBlobStorageDirectory(configuration,
fallbackStorageDirectory);
+ return new BlobCacheService(configuration, storageDirectory, blobView,
serverAddress);
+ }
+
+ static Reference<File> createBlobStorageDirectory(
+ Configuration configuration, @Nullable Reference<File>
fallbackStorageDirectory)
+ throws IOException {
+ final String basePath =
configuration.getString(BlobServerOptions.STORAGE_DIRECTORY);
- File baseDir;
+ File baseDir = null;
if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
- final String[] tmpDirPaths =
ConfigurationUtils.parseTempDirectories(config);
- baseDir = new
File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
+ if (fallbackStorageDirectory != null) {
+ baseDir = fallbackStorageDirectory.deref();
+
+ if (baseDir.mkdirs() || baseDir.exists()) {
+ return fallbackStorageDirectory;
+ }
+ }
} else {
baseDir = new File(basePath);
- }
- File storageDir;
+ File storageDir;
- // NOTE: although we will be using UUIDs, there may be collisions
- int maxAttempts = 10;
- for (int attempt = 0; attempt < maxAttempts; attempt++) {
- storageDir =
- new File(baseDir, String.format("blobStore-%s",
UUID.randomUUID().toString()));
+ // NOTE: although we will be using UUIDs, there may be collisions
+ int maxAttempts = 10;
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ storageDir =
+ new File(
+ baseDir,
+ String.format("blobStore-%s",
UUID.randomUUID().toString()));
- // Create the storage dir if it doesn't exist. Only return it when
the operation was
- // successful.
- if (storageDir.mkdirs()) {
- return storageDir;
+ // Create the storage dir if it doesn't exist. Only return it
when the operation was
+ // successful.
+ if (storageDir.mkdirs()) {
+ return Reference.owned(storageDir);
+ }
}
}
- // max attempts exceeded to find a storage directory
- throw new IOException(
- "Could not create storage directory for BLOB store in '" +
baseDir + "'.");
+ if (baseDir != null) {
+ throw new IOException(
+ "Could not create storage directory for BLOB store in '" +
baseDir + "'.");
+ } else {
+ throw new IOException(
+ "Could not create storage directory for BLOB store because
not storage directory has been specified.");
Review comment:
Will do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]