This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 525f6bc818eb7f15a19fa81421584920de8f8876 Author: Yu Chen <yuchen.e...@gmail.com> AuthorDate: Mon Jan 8 09:45:27 2024 +0800 [FLINK-33434][runtime] Generalize TaskManager file upload function to support different fileType (baseDir) --- .../runtime/resourcemanager/ResourceManager.java | 18 +++++++++++++--- .../resourcemanager/ResourceManagerGateway.java | 25 +++++++++++++++++++++- .../rest/handler/LeaderRetrievalHandler.java | 5 +++++ .../taskmanager/TaskManagerCustomLogHandler.java | 8 +++++-- .../flink/runtime/taskexecutor/TaskExecutor.java | 21 ++++++++++++++---- .../runtime/taskexecutor/TaskExecutorGateway.java | 15 ++++++++++++- .../TaskExecutorGatewayDecoratorBase.java | 9 +++++++- .../utils/TestingResourceManagerGateway.java | 25 ++++++++++++++++++++++ .../taskexecutor/TestingTaskExecutorGateway.java | 9 +++++++- 9 files changed, 122 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index bd6fb36eda5..400289aa849 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -88,6 +88,7 @@ import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -811,22 +812,33 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> @Override public CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByName( ResourceID taskManagerId, String fileName, Time timeout) { + return requestTaskManagerFileUploadByNameAndType( + taskManagerId, fileName, FileType.LOG, Duration.ofMillis(timeout.toMilliseconds())); + } + + @Override + public CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByNameAndType( + ResourceID taskManagerId, String fileName, FileType fileType, Duration timeout) { log.debug( - "Request upload of file {} from TaskExecutor {}.", + "Request upload of file {} (type: {}) from TaskExecutor {}.", fileName, + fileType, taskManagerId.getStringWithMetadata()); final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId); if (taskExecutor == null) { log.debug( - "Request upload of file {} from unregistered TaskExecutor {}.", + "Request upload of file {} (type: {}) from unregistered TaskExecutor {}.", fileName, + fileType, taskManagerId.getStringWithMetadata()); return FutureUtils.completedExceptionally( new UnknownTaskExecutorException(taskManagerId)); } else { - return taskExecutor.getTaskExecutorGateway().requestFileUploadByName(fileName, timeout); + return taskExecutor + .getTaskExecutorGateway() + .requestFileUploadByNameAndType(fileName, fileType, timeout); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index c68b8fef25e..83bd69dc3d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway; import javax.annotation.Nullable; +import java.time.Duration; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -225,17 +226,39 @@ public interface ResourceManagerGateway /** * Request the file upload from the given {@link TaskExecutor} to the cluster's {@link - * BlobServer}. The corresponding {@link TransientBlobKey} is returned. + * BlobServer}. The corresponding {@link TransientBlobKey} is returned. To support different + * type file upload with name, using {@link + * ResourceManager#requestTaskManagerFileUploadByNameAndType} as instead. * * @param taskManagerId identifying the {@link TaskExecutor} to upload the specified file * @param fileName name of the file to upload * @param timeout for the asynchronous operation * @return Future which is completed with the {@link TransientBlobKey} after uploading the file * to the {@link BlobServer}. + * @deprecated use {@link #requestTaskManagerFileUploadByNameAndType(ResourceID, String, + * FileType, Duration)} as instead. */ + @Deprecated CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByName( ResourceID taskManagerId, String fileName, @RpcTimeout Time timeout); + /** + * Request the file upload from the given {@link TaskExecutor} to the cluster's {@link + * BlobServer}. The corresponding {@link TransientBlobKey} is returned. + * + * @param taskManagerId identifying the {@link TaskExecutor} to upload the specified file + * @param fileName name of the file to upload + * @param fileType type of the file to upload + * @param timeout for the asynchronous operation + * @return Future which is completed with the {@link TransientBlobKey} after uploading the file + * to the {@link BlobServer}. + */ + CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByNameAndType( + ResourceID taskManagerId, + String fileName, + FileType fileType, + @RpcTimeout Duration timeout); + /** * Request log list from the given {@link TaskExecutor}. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java index 1b77b682c4b..1de440ad592 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LeaderRetrievalHandler.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.Map; /** @@ -67,6 +68,10 @@ public abstract class LeaderRetrievalHandler<T extends RestfulGateway> this.responseHeaders = Preconditions.checkNotNull(responseHeaders); } + protected Duration getTimeout() { + return Duration.ofMillis(timeout.toMilliseconds()); + } + @Override protected void channelRead0( ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java index 39a013a8c2f..98829de51b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerCustomLogHandler.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.LogFileNamePathParameter; import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessageParameters; +import org.apache.flink.runtime.taskexecutor.FileType; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -67,8 +68,11 @@ public class TaskManagerCustomLogHandler protected CompletableFuture<TransientBlobKey> requestFileUpload( ResourceManagerGateway resourceManagerGateway, Tuple2<ResourceID, String> taskManagerIdAndFileName) { - return resourceManagerGateway.requestTaskManagerFileUploadByName( - taskManagerIdAndFileName.f0, taskManagerIdAndFileName.f1, timeout); + return resourceManagerGateway.requestTaskManagerFileUploadByNameAndType( + taskManagerIdAndFileName.f0, + taskManagerIdAndFileName.f1, + FileType.LOG, + getTimeout()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 2fea646083c..da753f4b8e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1291,14 +1291,27 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public CompletableFuture<TransientBlobKey> requestFileUploadByName( - String fileName, Time timeout) { + String fileName, Duration timeout) { + return requestFileUploadByNameAndType(fileName, FileType.LOG, timeout); + } + + @Override + public CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType( + String fileName, FileType fileType, Duration timeout) { final String filePath; - final String logDir = taskManagerConfiguration.getTaskManagerLogDir(); - if (StringUtils.isNullOrWhitespaceOnly(logDir) + final String baseDir; + switch (fileType) { + case LOG: + baseDir = taskManagerConfiguration.getTaskManagerLogDir(); + break; + default: + baseDir = null; + } + if (StringUtils.isNullOrWhitespaceOnly(baseDir) || StringUtils.isNullOrWhitespaceOnly(fileName)) { filePath = null; } else { - filePath = new File(logDir, new File(fileName).getName()).getPath(); + filePath = new File(baseDir, new File(fileName).getName()).getPath(); } return requestFileUploadByFilePath(filePath, fileName); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 8379d967e0a..c70e817cb33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.SerializedValue; +import java.time.Duration; import java.util.Collection; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -255,7 +256,19 @@ public interface TaskExecutorGateway * @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. */ CompletableFuture<TransientBlobKey> requestFileUploadByName( - String fileName, @RpcTimeout Time timeout); + String fileName, @RpcTimeout Duration timeout); + + /** + * Requests the file upload of the specified name and file type to the cluster's {@link + * BlobServer}. + * + * @param fileName to upload + * @param fileType to upload + * @param timeout for the asynchronous operation + * @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. + */ + CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType( + String fileName, FileType fileType, @RpcTimeout Duration timeout); /** * Returns the gateway of Metric Query Service on the TaskManager. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java index 99046ee727b..e0e8ff7af48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.SerializedValue; +import java.time.Duration; import java.util.Collection; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -201,10 +202,16 @@ public class TaskExecutorGatewayDecoratorBase implements TaskExecutorGateway { @Override public CompletableFuture<TransientBlobKey> requestFileUploadByName( - String fileName, Time timeout) { + String fileName, Duration timeout) { return originalGateway.requestFileUploadByName(fileName, timeout); } + @Override + public CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType( + String fileName, FileType fileType, Duration timeout) { + return originalGateway.requestFileUploadByNameAndType(fileName, fileType, timeout); + } + @Override public CompletableFuture<SerializableOptional<String>> requestMetricQueryServiceAddress( Time timeout) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 52ba9d20513..91e13eb34f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -59,6 +59,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.QuadFunction; +import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -95,6 +96,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { private volatile Function<Tuple2<ResourceID, String>, CompletableFuture<TransientBlobKey>> requestTaskManagerFileUploadByNameFunction; + private volatile Function< + Tuple3<ResourceID, String, FileType>, CompletableFuture<TransientBlobKey>> + requestTaskManagerFileUploadByNameAndTypeFunction; + private volatile Consumer<Tuple2<ResourceID, Throwable>> disconnectTaskExecutorConsumer; private volatile Function< @@ -187,6 +192,13 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { requestTaskManagerFileUploadByNameFunction; } + public void setRequestTaskManagerFileUploadByNameAndTypeFunction( + Function<Tuple3<ResourceID, String, FileType>, CompletableFuture<TransientBlobKey>> + requestTaskManagerFileUploadByNameAndTypeFunction) { + this.requestTaskManagerFileUploadByNameAndTypeFunction = + requestTaskManagerFileUploadByNameAndTypeFunction; + } + public void setRequestTaskManagerLogListFunction( Function<ResourceID, CompletableFuture<Collection<LogInfo>>> requestTaskManagerLogListFunction) { @@ -441,6 +453,19 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { } } + @Override + public CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByNameAndType( + ResourceID taskManagerId, String fileName, FileType fileType, Duration timeout) { + final Function<Tuple3<ResourceID, String, FileType>, CompletableFuture<TransientBlobKey>> + function = requestTaskManagerFileUploadByNameAndTypeFunction; + + if (function != null) { + return function.apply(Tuple3.of(taskManagerId, fileName, fileType)); + } else { + return CompletableFuture.completedFuture(new TransientBlobKey()); + } + } + @Override public CompletableFuture<Collection<LogInfo>> requestTaskManagerLogList( ResourceID taskManagerId, Time timeout) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 63a721ea90f..7ea5ee1b08e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -49,6 +49,7 @@ import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.QuadFunction; import org.apache.flink.util.function.TriFunction; +import java.time.Duration; import java.util.Collection; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -313,7 +314,13 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { @Override public CompletableFuture<TransientBlobKey> requestFileUploadByName( - String fileName, Time timeout) { + String fileName, Duration timeout) { + return FutureUtils.completedExceptionally(new UnsupportedOperationException()); + } + + @Override + public CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType( + String fileName, FileType fileType, Duration timeout) { return FutureUtils.completedExceptionally(new UnsupportedOperationException()); }