This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 525f6bc818eb7f15a19fa81421584920de8f8876
Author: Yu Chen <yuchen.e...@gmail.com>
AuthorDate: Mon Jan 8 09:45:27 2024 +0800

    [FLINK-33434][runtime] Generalize TaskManager file upload function to 
support different fileType (baseDir)
---
 .../runtime/resourcemanager/ResourceManager.java   | 18 +++++++++++++---
 .../resourcemanager/ResourceManagerGateway.java    | 25 +++++++++++++++++++++-
 .../rest/handler/LeaderRetrievalHandler.java       |  5 +++++
 .../taskmanager/TaskManagerCustomLogHandler.java   |  8 +++++--
 .../flink/runtime/taskexecutor/TaskExecutor.java   | 21 ++++++++++++++----
 .../runtime/taskexecutor/TaskExecutorGateway.java  | 15 ++++++++++++-
 .../TaskExecutorGatewayDecoratorBase.java          |  9 +++++++-
 .../utils/TestingResourceManagerGateway.java       | 25 ++++++++++++++++++++++
 .../taskexecutor/TestingTaskExecutorGateway.java   |  9 +++++++-
 9 files changed, 122 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index bd6fb36eda5..400289aa849 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -88,6 +88,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -811,22 +812,33 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
     @Override
     public CompletableFuture<TransientBlobKey> 
requestTaskManagerFileUploadByName(
             ResourceID taskManagerId, String fileName, Time timeout) {
+        return requestTaskManagerFileUploadByNameAndType(
+                taskManagerId, fileName, FileType.LOG, 
Duration.ofMillis(timeout.toMilliseconds()));
+    }
+
+    @Override
+    public CompletableFuture<TransientBlobKey> 
requestTaskManagerFileUploadByNameAndType(
+            ResourceID taskManagerId, String fileName, FileType fileType, 
Duration timeout) {
         log.debug(
-                "Request upload of file {} from TaskExecutor {}.",
+                "Request upload of file {} (type: {}) from TaskExecutor {}.",
                 fileName,
+                fileType,
                 taskManagerId.getStringWithMetadata());
 
         final WorkerRegistration<WorkerType> taskExecutor = 
taskExecutors.get(taskManagerId);
 
         if (taskExecutor == null) {
             log.debug(
-                    "Request upload of file {} from unregistered TaskExecutor 
{}.",
+                    "Request upload of file {} (type: {}) from unregistered 
TaskExecutor {}.",
                     fileName,
+                    fileType,
                     taskManagerId.getStringWithMetadata());
             return FutureUtils.completedExceptionally(
                     new UnknownTaskExecutorException(taskManagerId));
         } else {
-            return 
taskExecutor.getTaskExecutorGateway().requestFileUploadByName(fileName, 
timeout);
+            return taskExecutor
+                    .getTaskExecutorGateway()
+                    .requestFileUploadByNameAndType(fileName, fileType, 
timeout);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index c68b8fef25e..83bd69dc3d8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -225,17 +226,39 @@ public interface ResourceManagerGateway
 
     /**
      * Request the file upload from the given {@link TaskExecutor} to the 
cluster's {@link
-     * BlobServer}. The corresponding {@link TransientBlobKey} is returned.
+     * BlobServer}. The corresponding {@link TransientBlobKey} is returned. To 
support different
+     * type file upload with name, using {@link
+     * ResourceManager#requestTaskManagerFileUploadByNameAndType} as instead.
      *
      * @param taskManagerId identifying the {@link TaskExecutor} to upload the 
specified file
      * @param fileName name of the file to upload
      * @param timeout for the asynchronous operation
      * @return Future which is completed with the {@link TransientBlobKey} 
after uploading the file
      *     to the {@link BlobServer}.
+     * @deprecated use {@link 
#requestTaskManagerFileUploadByNameAndType(ResourceID, String,
+     *     FileType, Duration)} as instead.
      */
+    @Deprecated
     CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByName(
             ResourceID taskManagerId, String fileName, @RpcTimeout Time 
timeout);
 
+    /**
+     * Request the file upload from the given {@link TaskExecutor} to the 
cluster's {@link
+     * BlobServer}. The corresponding {@link TransientBlobKey} is returned.
+     *
+     * @param taskManagerId identifying the {@link TaskExecutor} to upload the 
specified file
+     * @param fileName name of the file to upload
+     * @param fileType type of the file to upload
+     * @param timeout for the asynchronous operation
+     * @return Future which is completed with the {@link TransientBlobKey} 
after uploading the file
+     *     to the {@link BlobServer}.
+     */
+    CompletableFuture<TransientBlobKey> 
requestTaskManagerFileUploadByNameAndType(
+            ResourceID taskManagerId,
+            String fileName,
+            FileType fileType,
+            @RpcTimeout Duration timeout);
+
     /**
      * Request log list from the given {@link TaskExecutor}.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java
index 1b77b682c4b..1de440ad592 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.Map;
 
 /**
@@ -67,6 +68,10 @@ public abstract class LeaderRetrievalHandler<T extends 
RestfulGateway>
         this.responseHeaders = Preconditions.checkNotNull(responseHeaders);
     }
 
+    protected Duration getTimeout() {
+        return Duration.ofMillis(timeout.toMilliseconds());
+    }
+
     @Override
     protected void channelRead0(
             ChannelHandlerContext channelHandlerContext, RoutedRequest 
routedRequest) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java
index 39a013a8c2f..98829de51b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.LogFileNamePathParameter;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessageParameters;
+import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -67,8 +68,11 @@ public class TaskManagerCustomLogHandler
     protected CompletableFuture<TransientBlobKey> requestFileUpload(
             ResourceManagerGateway resourceManagerGateway,
             Tuple2<ResourceID, String> taskManagerIdAndFileName) {
-        return resourceManagerGateway.requestTaskManagerFileUploadByName(
-                taskManagerIdAndFileName.f0, taskManagerIdAndFileName.f1, 
timeout);
+        return 
resourceManagerGateway.requestTaskManagerFileUploadByNameAndType(
+                taskManagerIdAndFileName.f0,
+                taskManagerIdAndFileName.f1,
+                FileType.LOG,
+                getTimeout());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 2fea646083c..da753f4b8e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1291,14 +1291,27 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
 
     @Override
     public CompletableFuture<TransientBlobKey> requestFileUploadByName(
-            String fileName, Time timeout) {
+            String fileName, Duration timeout) {
+        return requestFileUploadByNameAndType(fileName, FileType.LOG, timeout);
+    }
+
+    @Override
+    public CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType(
+            String fileName, FileType fileType, Duration timeout) {
         final String filePath;
-        final String logDir = taskManagerConfiguration.getTaskManagerLogDir();
-        if (StringUtils.isNullOrWhitespaceOnly(logDir)
+        final String baseDir;
+        switch (fileType) {
+            case LOG:
+                baseDir = taskManagerConfiguration.getTaskManagerLogDir();
+                break;
+            default:
+                baseDir = null;
+        }
+        if (StringUtils.isNullOrWhitespaceOnly(baseDir)
                 || StringUtils.isNullOrWhitespaceOnly(fileName)) {
             filePath = null;
         } else {
-            filePath = new File(logDir, new 
File(fileName).getName()).getPath();
+            filePath = new File(baseDir, new 
File(fileName).getName()).getPath();
         }
         return requestFileUploadByFilePath(filePath, fileName);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 8379d967e0a..c70e817cb33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.SerializedValue;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -255,7 +256,19 @@ public interface TaskExecutorGateway
      * @return Future which is completed with the {@link TransientBlobKey} of 
the uploaded file.
      */
     CompletableFuture<TransientBlobKey> requestFileUploadByName(
-            String fileName, @RpcTimeout Time timeout);
+            String fileName, @RpcTimeout Duration timeout);
+
+    /**
+     * Requests the file upload of the specified name and file type to the 
cluster's {@link
+     * BlobServer}.
+     *
+     * @param fileName to upload
+     * @param fileType to upload
+     * @param timeout for the asynchronous operation
+     * @return Future which is completed with the {@link TransientBlobKey} of 
the uploaded file.
+     */
+    CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType(
+            String fileName, FileType fileType, @RpcTimeout Duration timeout);
 
     /**
      * Returns the gateway of Metric Query Service on the TaskManager.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
index 99046ee727b..e0e8ff7af48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
 import org.apache.flink.types.SerializableOptional;
 import org.apache.flink.util.SerializedValue;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -201,10 +202,16 @@ public class TaskExecutorGatewayDecoratorBase implements 
TaskExecutorGateway {
 
     @Override
     public CompletableFuture<TransientBlobKey> requestFileUploadByName(
-            String fileName, Time timeout) {
+            String fileName, Duration timeout) {
         return originalGateway.requestFileUploadByName(fileName, timeout);
     }
 
+    @Override
+    public CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType(
+            String fileName, FileType fileType, Duration timeout) {
+        return originalGateway.requestFileUploadByNameAndType(fileName, 
fileType, timeout);
+    }
+
     @Override
     public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(
             Time timeout) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 52ba9d20513..91e13eb34f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -59,6 +59,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.QuadFunction;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -95,6 +96,10 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
     private volatile Function<Tuple2<ResourceID, String>, 
CompletableFuture<TransientBlobKey>>
             requestTaskManagerFileUploadByNameFunction;
 
+    private volatile Function<
+                    Tuple3<ResourceID, String, FileType>, 
CompletableFuture<TransientBlobKey>>
+            requestTaskManagerFileUploadByNameAndTypeFunction;
+
     private volatile Consumer<Tuple2<ResourceID, Throwable>> 
disconnectTaskExecutorConsumer;
 
     private volatile Function<
@@ -187,6 +192,13 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
                 requestTaskManagerFileUploadByNameFunction;
     }
 
+    public void setRequestTaskManagerFileUploadByNameAndTypeFunction(
+            Function<Tuple3<ResourceID, String, FileType>, 
CompletableFuture<TransientBlobKey>>
+                    requestTaskManagerFileUploadByNameAndTypeFunction) {
+        this.requestTaskManagerFileUploadByNameAndTypeFunction =
+                requestTaskManagerFileUploadByNameAndTypeFunction;
+    }
+
     public void setRequestTaskManagerLogListFunction(
             Function<ResourceID, CompletableFuture<Collection<LogInfo>>>
                     requestTaskManagerLogListFunction) {
@@ -441,6 +453,19 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
         }
     }
 
+    @Override
+    public CompletableFuture<TransientBlobKey> 
requestTaskManagerFileUploadByNameAndType(
+            ResourceID taskManagerId, String fileName, FileType fileType, 
Duration timeout) {
+        final Function<Tuple3<ResourceID, String, FileType>, 
CompletableFuture<TransientBlobKey>>
+                function = requestTaskManagerFileUploadByNameAndTypeFunction;
+
+        if (function != null) {
+            return function.apply(Tuple3.of(taskManagerId, fileName, 
fileType));
+        } else {
+            return CompletableFuture.completedFuture(new TransientBlobKey());
+        }
+    }
+
     @Override
     public CompletableFuture<Collection<LogInfo>> requestTaskManagerLogList(
             ResourceID taskManagerId, Time timeout) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 63a721ea90f..7ea5ee1b08e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -49,6 +49,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.QuadFunction;
 import org.apache.flink.util.function.TriFunction;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -313,7 +314,13 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
     @Override
     public CompletableFuture<TransientBlobKey> requestFileUploadByName(
-            String fileName, Time timeout) {
+            String fileName, Duration timeout) {
+        return FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
+    }
+
+    @Override
+    public CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType(
+            String fileName, FileType fileType, Duration timeout) {
         return FutureUtils.completedExceptionally(new 
UnsupportedOperationException());
     }
 

Reply via email to