github-advanced-security[bot] commented on code in PR #19433:
URL: https://github.com/apache/druid/pull/19433#discussion_r3208035906
##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -146,9 +161,28 @@
this.emitter = emitter;
this.currentCapacity = new AtomicInteger(config.getCapacity());
- this.tpe = new ThreadPoolExecutor(currentCapacity.get(),
currentCapacity.get(), 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>(), Execs.makeThreadFactory("k8s-task-runner-%d",
null));
+ if (sharedExecutor == null) {
+ this.tpe = new ThreadPoolExecutor(
+ currentCapacity.get(),
+ currentCapacity.get(),
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ Execs.makeThreadFactory("k8s-task-runner-%d", null)
+ );
+ this.ownsExecutor = true;
+ } else {
+ this.tpe = sharedExecutor;
+ this.ownsExecutor = false;
+ }
this.exec = MoreExecutors.listeningDecorator(this.tpe);
- configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()),
this::syncCapacityWithDynamicConfig);
+ if (ownsExecutor && configManager != null) {
+ configManager.addListener(
+ KubernetesTaskRunnerDynamicConfig.CONFIG_KEY,
+ StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()),
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [Thread.getId](1) should be avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11183)
##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/MultipleKubernetesTaskRunnerFactory.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.k8s.overlord;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.inject.Inject;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import org.apache.druid.common.config.ConfigManager;
+import org.apache.druid.guice.IndexingServiceModuleHelper;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.k8s.overlord.common.CachingKubernetesPeonClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient;
+import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
+import
org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory;
+import
org.apache.druid.k8s.overlord.execution.KubernetesTaskRunnerDynamicConfig;
+import
org.apache.druid.k8s.overlord.taskadapter.DynamicConfigPodTemplateSelector;
+import org.apache.druid.k8s.overlord.taskadapter.MultiContainerTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.SingleContainerTaskAdapter;
+import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class MultipleKubernetesTaskRunnerFactory implements
TaskRunnerFactory<TaskRunner>
+{
+ public static final String TYPE_NAME = "multik8s";
+ private final ObjectMapper smileMapper;
+ private final HttpClient httpClient;
+ private final TaskLogs taskLogs;
+ private final ServiceEmitter emitter;
+ private final Supplier<KubernetesTaskRunnerDynamicConfig>
dynamicConfigSupplier;
+ private final ConfigManager configManager;
+ private final MultipleKubernetesTaskRunnerConfig runnerConfig;
+ private final TaskConfig taskConfig;
+ private final StartupLoggingConfig startupLoggingConfig;
+ private final DruidNode druidNode;
+ private final Properties properties;
+ private final DruidKubernetesHttpClientFactory httpClientFactory;
+ private TaskRunner runner;
+
+ @Inject
+ public MultipleKubernetesTaskRunnerFactory(
+ @Json ObjectMapper objectMapper,
+ @Smile ObjectMapper smileMapper,
+ @EscalatedGlobal final HttpClient httpClient,
+ TaskLogs taskLogs,
+ Properties properties,
+ ServiceEmitter emitter,
+ Supplier<KubernetesTaskRunnerDynamicConfig> dynamicConfigSupplier,
+ @Nullable ConfigManager configManager,
+ TaskConfig taskConfig,
+ StartupLoggingConfig startupLoggingConfig,
+ @Self DruidNode druidNode,
+ DruidKubernetesHttpClientFactory httpClientFactory
+ )
+ {
+ this.runnerConfig =
MultipleKubernetesTaskRunnerConfig.fromProperties(objectMapper, properties);
+
+ this.smileMapper = smileMapper;
+ this.httpClient = httpClient;
+ this.taskLogs = taskLogs;
+ this.emitter = emitter;
+ this.dynamicConfigSupplier = dynamicConfigSupplier;
+ this.configManager = configManager;
+ this.taskConfig = taskConfig;
+ this.startupLoggingConfig = startupLoggingConfig;
+ this.druidNode = druidNode;
+ this.properties = properties;
+ this.httpClientFactory = httpClientFactory;
+ }
+
+ @Override
+ public TaskRunner build()
+ {
+ final List<MultipleKubernetesTaskRunnerConfig.KubernetesCluster>
enabledClusters = this.runnerConfig.getClusters()
+
.stream()
+
.filter(cluster -> !cluster.isDisabled())
+
.collect(Collectors.toList());
+
+ if (enabledClusters.isEmpty()) {
+ throw new IllegalArgumentException("At least one task runner must be
enabled");
+ }
+
+ final int totalCapacity = new
KubernetesTaskRunnerEffectiveConfig(this.runnerConfig,
this.dynamicConfigSupplier).getCapacity();
+ final AutoscalableThreadPoolExecutor sharedExecutor = new
AutoscalableThreadPoolExecutor(totalCapacity, this.configManager);
+
+ final List<MultipleKubernetesTaskRunnerDelegate> taskRunners = new
ArrayList<>();
+ for (MultipleKubernetesTaskRunnerConfig.KubernetesCluster
kubernetesCluster : this.runnerConfig.getClusters()) {
+
+ final KubernetesTaskRunnerStaticConfig clusterConfig =
getPerClusterConfiguration(kubernetesCluster);
+ final KubernetesTaskRunnerEffectiveConfig effectiveConfig = new
KubernetesTaskRunnerEffectiveConfig(
+ clusterConfig,
+ this.dynamicConfigSupplier
+ );
+
+ final DruidKubernetesClient client =
createClientForCluster(kubernetesCluster, clusterConfig);
+ final TaskAdapter clusterTaskAdapter = buildTaskAdapter(client,
effectiveConfig);
+ final boolean useOverlordNamespace =
PodTemplateTaskAdapter.TYPE.equals(clusterTaskAdapter.getAdapterType());
+ final DruidKubernetesCachingClient cachingClient =
effectiveConfig.isUseK8sSharedInformers()
+ ?
createCachingClient(client, effectiveConfig)
+ : null;
+
+ final KubernetesPeonClient peonClient = createPeonClient(
+ client,
+ cachingClient,
+ effectiveConfig,
+ useOverlordNamespace
+ );
+
+ final KubernetesTaskRunner clusterRunner = new KubernetesTaskRunner(
+ clusterTaskAdapter,
+ effectiveConfig,
+ peonClient,
+ httpClient,
+ new KubernetesPeonLifecycleFactory(
+ peonClient,
+ taskLogs,
+ smileMapper,
+
effectiveConfig.getLogSaveTimeout().toStandardDuration().getMillis()
+ ),
+ emitter,
+ sharedExecutor,
+ configManager
+ );
+
+ taskRunners.add(
+ new MultipleKubernetesTaskRunnerDelegate(
+ clusterRunner,
+ kubernetesCluster.getName(),
+ kubernetesCluster.isDisabled(),
+ client,
+ cachingClient
+ )
+ );
+ }
+
+ this.runner = new MultipleKubernetesTaskRunner(
+ new KubernetesTaskRunnerEffectiveConfig(
+ this.runnerConfig,
+ this.dynamicConfigSupplier
+ ),
+ runnerConfig.getClusterSelector(),
+ taskRunners,
+ sharedExecutor
+ );
+ return this.runner;
+ }
+
+ @Override
+ public TaskRunner get()
+ {
+ return runner;
+ }
+
+ private DruidKubernetesClient createClientForCluster(
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster,
+ KubernetesTaskRunnerStaticConfig clusterConfig
+ )
+ {
+ Config config;
+ if (cluster.getKubeconfigPath() != null &&
!cluster.getKubeconfigPath().trim().isEmpty()) {
+ config = Config.fromKubeconfig(new File(cluster.getKubeconfigPath()));
+ } else {
+ config = new ConfigBuilder().build();
+ }
+
+ if (clusterConfig.isDisableClientProxy()) {
+ config.setHttpsProxy(null);
+ config.setHttpProxy(null);
+ }
+
+ config.setNamespace(clusterConfig.getNamespace());
+
+ return new DruidKubernetesClient(httpClientFactory, config);
+ }
+
+ protected DruidKubernetesCachingClient createCachingClient(
+ DruidKubernetesClient client,
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig
+ )
+ {
+ return new DruidKubernetesCachingClient(
+ client,
+ effectiveConfig.getNamespace(),
+
effectiveConfig.getK8sSharedInformerResyncPeriod().toStandardDuration().getMillis()
+ );
+ }
+
+ private KubernetesPeonClient createPeonClient(
+ DruidKubernetesClient client,
+ @Nullable DruidKubernetesCachingClient cachingClient,
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig,
+ boolean useOverlordNamespace
+ )
+ {
+ if (cachingClient != null) {
+ return new CachingKubernetesPeonClient(
+ cachingClient,
+ effectiveConfig.getNamespace(),
+ useOverlordNamespace ? effectiveConfig.getOverlordNamespace() : "",
+ effectiveConfig.isDebugJobs(),
+ emitter
+ );
+ }
+
+ return new KubernetesPeonClient(
+ client,
+ effectiveConfig.getNamespace(),
+ useOverlordNamespace ? effectiveConfig.getOverlordNamespace() : "",
+ effectiveConfig.isDebugJobs(),
+ emitter
+ );
+ }
+
+ private TaskAdapter buildTaskAdapter(
+ DruidKubernetesClient client,
+ KubernetesTaskRunnerEffectiveConfig effectiveConfig
+ )
+ {
+ final String adapter = properties.getProperty(String.format(
+ Locale.ROOT,
+ "%s.%s.adapter.type",
+ IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX,
+ KubernetesTaskRunnerFactory.TYPE_NAME
+ ));
+
+ if (adapter != null
+ && !MultiContainerTaskAdapter.TYPE.equals(adapter)
+ && effectiveConfig.isSidecarSupport()) {
+ throw new IAE(
+ "Invalid pod adapter [%s], only pod adapter [%s] can be specified
when sidecarSupport is enabled",
+ adapter,
+ MultiContainerTaskAdapter.TYPE
+ );
+ }
+
+ if (MultiContainerTaskAdapter.TYPE.equals(adapter) ||
effectiveConfig.isSidecarSupport()) {
+ return new MultiContainerTaskAdapter(
+ client,
+ effectiveConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ smileMapper,
+ taskLogs
+ );
+ } else if (PodTemplateTaskAdapter.TYPE.equals(adapter)) {
+ return new PodTemplateTaskAdapter(
+ effectiveConfig,
+ taskConfig,
+ druidNode,
+ smileMapper,
+ taskLogs,
+ new DynamicConfigPodTemplateSelector(properties, effectiveConfig)
+ );
+ } else {
+ return new SingleContainerTaskAdapter(
+ client,
+ effectiveConfig,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ smileMapper,
+ taskLogs
+ );
+ }
+ }
+
+ private KubernetesTaskRunnerStaticConfig getPerClusterConfiguration(
+ MultipleKubernetesTaskRunnerConfig.KubernetesCluster cluster
+ )
+ {
+ return new KubernetesTaskRunnerStaticConfig(
+ cluster.getTaskNamespace(),
+ cluster.getOverlordIdentifier(),
+ this.runnerConfig.getK8sTaskPodNamePrefix(),
+ this.runnerConfig.isDebugJobs(),
+ this.runnerConfig.isSidecarSupport(),
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [KubernetesTaskRunnerStaticConfig.isSidecarSupport](1) should be
avoided because it has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11184)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]