shuai-xu commented on code in PR #8839: URL: https://github.com/apache/incubator-gluten/pull/8839#discussion_r2000147010
########## gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java: ########## @@ -0,0 +1,823 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JMXServerOptions; +import org.apache.flink.configuration.RpcOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.core.security.FlinkSecurityManager; +import org.apache.flink.management.jmx.JMXService; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.blob.TaskExecutorBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; +import org.apache.flink.runtime.entrypoint.DeterminismEnvelope; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.WorkingDirectory; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.ReporterSetup; +import org.apache.flink.runtime.metrics.TraceReporterSetup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.runtime.rpc.RpcSystemUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.security.SecurityConfiguration; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; +import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; +import org.apache.flink.runtime.taskmanager.MemoryLogger; +import org.apache.flink.runtime.util.ConfigurationParserUtils; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Reference; +import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TaskManagerExceptionUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.FunctionUtils; + +import io.github.zhztheplayer.velox4j.Velox4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is the executable entry point for the task manager in yarn or standalone mode. It + * constructs the related components (network, I/O manager, memory manager, RPC service, HA service) + * and starts them. + */ +public class TaskManagerRunner implements FatalErrorHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class); + + private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L; + + private static final int SUCCESS_EXIT_CODE = 0; + @VisibleForTesting public static final int FAILURE_EXIT_CODE = 1; + + private final Thread shutdownHook; + + private final Object lock = new Object(); + + private final Configuration configuration; + + private final Time timeout; + + private final PluginManager pluginManager; + + private final TaskExecutorServiceFactory taskExecutorServiceFactory; + + private final CompletableFuture<Result> terminationFuture; + + @GuardedBy("lock") + private DeterminismEnvelope<ResourceID> resourceId; + + /** Executor used to run future callbacks. */ + @GuardedBy("lock") + private ExecutorService executor; + + @GuardedBy("lock") + private RpcSystem rpcSystem; + + @GuardedBy("lock") + private RpcService rpcService; + + @GuardedBy("lock") + private HighAvailabilityServices highAvailabilityServices; + + @GuardedBy("lock") + private MetricRegistryImpl metricRegistry; + + @GuardedBy("lock") + private BlobCacheService blobCacheService; + + @GuardedBy("lock") + private DeterminismEnvelope<WorkingDirectory> workingDirectory; + + @GuardedBy("lock") + private TaskExecutorService taskExecutorService; + + @GuardedBy("lock") + private boolean shutdown; + + public TaskManagerRunner( + Configuration configuration, + PluginManager pluginManager, + TaskExecutorServiceFactory taskExecutorServiceFactory) + throws Exception { + this.configuration = checkNotNull(configuration); + this.pluginManager = checkNotNull(pluginManager); + this.taskExecutorServiceFactory = checkNotNull(taskExecutorServiceFactory); + + timeout = Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); + + this.terminationFuture = new CompletableFuture<>(); + this.shutdown = false; + + this.shutdownHook = + ShutdownHookUtil.addShutdownHook( + () -> this.closeAsync(Result.JVM_SHUTDOWN).join(), + getClass().getSimpleName(), + LOG); + } + + private void startTaskManagerRunnerServices() throws Exception { + synchronized (lock) { Review Comment: This is flink code, I just copy it and add some code to load libvelox.so. I think we can ignore it first. ########## gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java: ########## @@ -0,0 +1,823 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JMXServerOptions; +import org.apache.flink.configuration.RpcOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.core.security.FlinkSecurityManager; +import org.apache.flink.management.jmx.JMXService; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.blob.TaskExecutorBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; +import org.apache.flink.runtime.entrypoint.DeterminismEnvelope; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.WorkingDirectory; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.ReporterSetup; +import org.apache.flink.runtime.metrics.TraceReporterSetup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.runtime.rpc.RpcSystemUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.security.SecurityConfiguration; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; +import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; +import org.apache.flink.runtime.taskmanager.MemoryLogger; +import org.apache.flink.runtime.util.ConfigurationParserUtils; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Reference; +import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TaskManagerExceptionUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.FunctionUtils; + +import io.github.zhztheplayer.velox4j.Velox4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is the executable entry point for the task manager in yarn or standalone mode. It + * constructs the related components (network, I/O manager, memory manager, RPC service, HA service) + * and starts them. + */ +public class TaskManagerRunner implements FatalErrorHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class); + + private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L; + + private static final int SUCCESS_EXIT_CODE = 0; + @VisibleForTesting public static final int FAILURE_EXIT_CODE = 1; + + private final Thread shutdownHook; + + private final Object lock = new Object(); + + private final Configuration configuration; + + private final Time timeout; + + private final PluginManager pluginManager; + + private final TaskExecutorServiceFactory taskExecutorServiceFactory; + + private final CompletableFuture<Result> terminationFuture; + + @GuardedBy("lock") + private DeterminismEnvelope<ResourceID> resourceId; + + /** Executor used to run future callbacks. */ + @GuardedBy("lock") + private ExecutorService executor; + + @GuardedBy("lock") + private RpcSystem rpcSystem; + + @GuardedBy("lock") + private RpcService rpcService; + + @GuardedBy("lock") + private HighAvailabilityServices highAvailabilityServices; + + @GuardedBy("lock") + private MetricRegistryImpl metricRegistry; + + @GuardedBy("lock") + private BlobCacheService blobCacheService; + + @GuardedBy("lock") + private DeterminismEnvelope<WorkingDirectory> workingDirectory; + + @GuardedBy("lock") + private TaskExecutorService taskExecutorService; + + @GuardedBy("lock") + private boolean shutdown; + + public TaskManagerRunner( + Configuration configuration, + PluginManager pluginManager, + TaskExecutorServiceFactory taskExecutorServiceFactory) + throws Exception { + this.configuration = checkNotNull(configuration); + this.pluginManager = checkNotNull(pluginManager); + this.taskExecutorServiceFactory = checkNotNull(taskExecutorServiceFactory); + + timeout = Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); + + this.terminationFuture = new CompletableFuture<>(); + this.shutdown = false; + + this.shutdownHook = + ShutdownHookUtil.addShutdownHook( + () -> this.closeAsync(Result.JVM_SHUTDOWN).join(), + getClass().getSimpleName(), + LOG); + } + + private void startTaskManagerRunnerServices() throws Exception { + synchronized (lock) { + rpcSystem = RpcSystem.load(configuration); + + this.executor = + Executors.newScheduledThreadPool( + Hardware.getNumberCPUCores(), + new ExecutorThreadFactory("taskmanager-future")); + + highAvailabilityServices = + HighAvailabilityServicesUtils.createHighAvailabilityServices( + configuration, + executor, + AddressResolution.NO_ADDRESS_RESOLUTION, + rpcSystem, + this); + + JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT)); + + rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem); + + this.resourceId = + getTaskManagerResourceID( + configuration, rpcService.getAddress(), rpcService.getPort()); + + this.workingDirectory = + ClusterEntrypointUtils.createTaskManagerWorkingDirectory( + configuration, resourceId); + + LOG.info("Using working directory: {}", workingDirectory); + + HeartbeatServices heartbeatServices = + HeartbeatServices.fromConfiguration(configuration); + + metricRegistry = + new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration( + configuration, + rpcSystem.getMaximumMessageSizeInBytes(configuration)), + ReporterSetup.fromConfiguration(configuration, pluginManager), + TraceReporterSetup.fromConfiguration(configuration, pluginManager)); + + final RpcService metricQueryServiceRpcService = + MetricUtils.startRemoteMetricsRpcService( + configuration, + rpcService.getAddress(), + configuration.get(TaskManagerOptions.BIND_HOST), + rpcSystem); + metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap()); + + blobCacheService = + BlobUtils.createBlobCacheService( + configuration, + Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()), + highAvailabilityServices.createBlobStore(), + null); + + final ExternalResourceInfoProvider externalResourceInfoProvider = + ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig( + configuration, pluginManager); + + final DelegationTokenReceiverRepository delegationTokenReceiverRepository = + new DelegationTokenReceiverRepository(configuration, pluginManager); + + taskExecutorService = + taskExecutorServiceFactory.createTaskExecutor( + this.configuration, + this.resourceId.unwrap(), + rpcService, + highAvailabilityServices, + heartbeatServices, + metricRegistry, + blobCacheService, + false, + externalResourceInfoProvider, + workingDirectory.unwrap(), + this, + delegationTokenReceiverRepository); + + handleUnexpectedTaskExecutorServiceTermination(); + + MemoryLogger.startIfConfigured( + LOG, configuration, terminationFuture.thenAccept(ignored -> {})); + } + } + + @GuardedBy("lock") + private void handleUnexpectedTaskExecutorServiceTermination() { + taskExecutorService + .getTerminationFuture() + .whenComplete( + (unused, throwable) -> { + synchronized (lock) { + if (!shutdown) { + onFatalError( + new FlinkException( + "Unexpected termination of the TaskExecutor.", + throwable)); + } + } + }); + } + + // -------------------------------------------------------------------------------------------- + // Lifecycle management + // -------------------------------------------------------------------------------------------- + + public void start() throws Exception { + synchronized (lock) { + startTaskManagerRunnerServices(); + taskExecutorService.start(); + } + } + + public void close() throws Exception { + try { + closeAsync().get(); + } catch (ExecutionException e) { + ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(e)); + } + } + + public CompletableFuture<Result> closeAsync() { + return closeAsync(Result.SUCCESS); + } + + private CompletableFuture<Result> closeAsync(Result terminationResult) { + synchronized (lock) { + // remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, this.getClass().getSimpleName(), LOG); + + if (shutdown) { + return terminationFuture; + } + + final CompletableFuture<Void> taskManagerTerminationFuture; + if (taskExecutorService != null) { + taskManagerTerminationFuture = taskExecutorService.closeAsync(); + } else { + taskManagerTerminationFuture = FutureUtils.completedVoidFuture(); + } + + final CompletableFuture<Void> serviceTerminationFuture = + FutureUtils.composeAfterwards( + taskManagerTerminationFuture, this::shutDownServices); + + final CompletableFuture<Void> workingDirCleanupFuture = + FutureUtils.runAfterwards( + serviceTerminationFuture, () -> deleteWorkingDir(terminationResult)); + + final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture; + + if (rpcSystem != null) { + rpcSystemClassLoaderCloseFuture = + FutureUtils.runAfterwards(workingDirCleanupFuture, rpcSystem::close); + } else { + rpcSystemClassLoaderCloseFuture = FutureUtils.completedVoidFuture(); + } + + rpcSystemClassLoaderCloseFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + terminationFuture.completeExceptionally(throwable); + } else { + terminationFuture.complete(terminationResult); + } + }); + + shutdown = true; + return terminationFuture; + } + } + + private void deleteWorkingDir(Result terminationResult) throws IOException { + synchronized (lock) { + if (workingDirectory != null) { + if (!workingDirectory.isDeterministic() || terminationResult == Result.SUCCESS) { + workingDirectory.unwrap().delete(); + } + } + } + } + + private CompletableFuture<Void> shutDownServices() { + synchronized (lock) { Review Comment: as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
