github-advanced-security[bot] commented on code in PR #18413:
URL: https://github.com/apache/druid/pull/18413#discussion_r2289010189


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/kubernetes/DruidClusterComponent.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.kubernetes;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.simulate.K3SResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.utils.PortAllocator;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.FileInputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Component that orchestrates a complete Druid cluster deployment.
+ * Manages all Druid services and their dependencies.
+ */
+public class DruidClusterComponent implements K8sComponent
+{
+  private static final Logger log = new Logger(DruidClusterComponent.class);
+
+  private static final String COMPONENT_NAME = "DruidCluster";
+  private static final String RBAC_MANIFEST_PATH = 
"data/manifests/druid-common/rbac.yaml";
+
+  private final String namespace;
+  private final String druidImage;
+  private final String clusterName;
+  private final List<DruidK8sComponent> druidServices = new ArrayList<>();
+  private K3SResource k3sResource;
+  private final PortAllocator portAllocator;
+
+  public DruidClusterComponent(String namespace, String druidImage, String 
clusterName, K3SResource k3sResource)
+  {
+    this.namespace = namespace;
+    this.druidImage = druidImage;
+    this.clusterName = clusterName;
+    this.k3sResource = k3sResource;
+    this.portAllocator = new PortAllocator(30080, 30100);
+  }
+
+
+  public void addDruidService(DruidK8sComponent service)
+  {
+    String serviceKey = service.getPodLabel();
+    int allocatedPort = portAllocator.allocatePort(serviceKey);
+    service.setAllocatedNodePort(allocatedPort);
+
+    // Share the TestFolder from K3SResource with the DruidK8sComponent
+    if (k3sResource != null && k3sResource.getTestFolder() != null) {
+      service.setTestFolder(k3sResource.getTestFolder());
+    }
+
+    druidServices.add(service);
+    log.info(
+        "Added Druid service %s with allocated NodePort %d",
+        service.getDruidServiceType(), allocatedPort
+    );
+  }
+
+  @Override
+  public void initialize(KubernetesClient client) throws Exception
+  {
+    log.info("Initializing %s...", getComponentName());
+    applyRBACManifests(client);
+    applyDruidClusterManifest(client);
+    log.info("%s initialization completed", getComponentName());
+  }
+
+  @Override
+  public void waitUntilReady(KubernetesClient client) throws Exception
+  {
+    log.info("Waiting for %s to be ready...", getComponentName());
+
+    log.info("Allowing 2 seconds for all Druid services to start and discover 
each other...");
+    Thread.sleep(2000);
+
+    for (DruidK8sComponent service : druidServices) {
+      log.info("Waiting for Druid %s to be ready...", 
service.getDruidServiceType());
+      service.waitUntilReady(client);
+    }
+
+    log.info("%s is ready - all services are healthy!", getComponentName());
+  }
+
+
+  @Override
+  public void cleanup(KubernetesClient client)
+  {
+    log.info("Cleaning up %s...", getComponentName());
+
+    for (DruidK8sComponent service : druidServices) {
+      try {
+        service.cleanup(client);
+        String serviceKey = service.getDruidServiceType() + "-" + clusterName;
+        portAllocator.releasePort(serviceKey);
+      }
+      catch (Exception e) {
+        log.error("Error cleaning up %s: %s", service.getDruidServiceType(), 
e.getMessage());
+      }
+    }
+
+    cleanupRBACResources(client);
+    log.info("%s cleanup completed", getComponentName());
+  }
+
+  @Override
+  public String getComponentName()
+  {
+    return COMPONENT_NAME;
+  }
+
+  @Override
+  public String getNamespace()
+  {
+    return namespace;
+  }
+
+  public List<DruidK8sComponent> getDruidServices()
+  {
+    return new ArrayList<>(druidServices);
+  }
+
+  public Optional<DruidK8sComponent> getCoordinator()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"coordinator".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public Optional<DruidK8sComponent> getBroker()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"broker".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public Optional<DruidK8sComponent> getRouter()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"router".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public List<DruidK8sHistoricalComponent> getHistoricals()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"historical".equals(service.getDruidServiceType()))
+                        .map(service -> (DruidK8sHistoricalComponent) service)
+                        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get all allocated ports for K3S container exposure.
+   */
+  public int[] getAllocatedPorts()
+  {
+    return portAllocator.getAllocatedPorts();
+  }
+
+  /**
+   * Get service to port mapping for debugging.
+   */
+  public Map<String, Integer> getServicePortMapping()
+  {
+    return portAllocator.getServicePortMapping();
+  }
+
+  public Optional<String> getBrokerUrl()
+  {
+    return getBroker().map(DruidK8sComponent::getServiceUrl);
+  }
+
+  public Optional<String> getRouterUrl()
+  {
+    return getRouter().map(DruidK8sComponent::getServiceUrl);
+  }
+
+  /**
+   * Get external coordinator URL for test connectivity.
+   */
+  public Optional<String> getCoordinatorExternalUrl()
+  {
+    return getCoordinator().map(coordinator -> 
coordinator.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Get external broker URL for test connectivity.
+   */
+  public Optional<String> getBrokerExternalUrl()
+  {
+    return getBroker().map(broker -> 
broker.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Get external router URL for test connectivity.
+   */
+  public Optional<String> getRouterExternalUrl(KubernetesClient client)

Review Comment:
   ## Useless parameter
   
   The parameter 'client' is never used.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10239)



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/kubernetes/DruidClusterComponent.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.kubernetes;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.simulate.K3SResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.utils.PortAllocator;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.FileInputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Component that orchestrates a complete Druid cluster deployment.
+ * Manages all Druid services and their dependencies.
+ */
+public class DruidClusterComponent implements K8sComponent
+{
+  private static final Logger log = new Logger(DruidClusterComponent.class);
+
+  private static final String COMPONENT_NAME = "DruidCluster";
+  private static final String RBAC_MANIFEST_PATH = 
"data/manifests/druid-common/rbac.yaml";
+
+  private final String namespace;
+  private final String druidImage;
+  private final String clusterName;
+  private final List<DruidK8sComponent> druidServices = new ArrayList<>();
+  private K3SResource k3sResource;
+  private final PortAllocator portAllocator;
+
+  public DruidClusterComponent(String namespace, String druidImage, String 
clusterName, K3SResource k3sResource)
+  {
+    this.namespace = namespace;
+    this.druidImage = druidImage;
+    this.clusterName = clusterName;
+    this.k3sResource = k3sResource;
+    this.portAllocator = new PortAllocator(30080, 30100);
+  }
+
+
+  public void addDruidService(DruidK8sComponent service)
+  {
+    String serviceKey = service.getPodLabel();
+    int allocatedPort = portAllocator.allocatePort(serviceKey);
+    service.setAllocatedNodePort(allocatedPort);
+
+    // Share the TestFolder from K3SResource with the DruidK8sComponent
+    if (k3sResource != null && k3sResource.getTestFolder() != null) {
+      service.setTestFolder(k3sResource.getTestFolder());
+    }
+
+    druidServices.add(service);
+    log.info(
+        "Added Druid service %s with allocated NodePort %d",
+        service.getDruidServiceType(), allocatedPort
+    );
+  }
+
+  @Override
+  public void initialize(KubernetesClient client) throws Exception
+  {
+    log.info("Initializing %s...", getComponentName());
+    applyRBACManifests(client);
+    applyDruidClusterManifest(client);
+    log.info("%s initialization completed", getComponentName());
+  }
+
+  @Override
+  public void waitUntilReady(KubernetesClient client) throws Exception
+  {
+    log.info("Waiting for %s to be ready...", getComponentName());
+
+    log.info("Allowing 2 seconds for all Druid services to start and discover 
each other...");
+    Thread.sleep(2000);
+
+    for (DruidK8sComponent service : druidServices) {
+      log.info("Waiting for Druid %s to be ready...", 
service.getDruidServiceType());
+      service.waitUntilReady(client);
+    }
+
+    log.info("%s is ready - all services are healthy!", getComponentName());
+  }
+
+
+  @Override
+  public void cleanup(KubernetesClient client)
+  {
+    log.info("Cleaning up %s...", getComponentName());
+
+    for (DruidK8sComponent service : druidServices) {
+      try {
+        service.cleanup(client);
+        String serviceKey = service.getDruidServiceType() + "-" + clusterName;
+        portAllocator.releasePort(serviceKey);
+      }
+      catch (Exception e) {
+        log.error("Error cleaning up %s: %s", service.getDruidServiceType(), 
e.getMessage());
+      }
+    }
+
+    cleanupRBACResources(client);
+    log.info("%s cleanup completed", getComponentName());
+  }
+
+  @Override
+  public String getComponentName()
+  {
+    return COMPONENT_NAME;
+  }
+
+  @Override
+  public String getNamespace()
+  {
+    return namespace;
+  }
+
+  public List<DruidK8sComponent> getDruidServices()
+  {
+    return new ArrayList<>(druidServices);
+  }
+
+  public Optional<DruidK8sComponent> getCoordinator()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"coordinator".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public Optional<DruidK8sComponent> getBroker()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"broker".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public Optional<DruidK8sComponent> getRouter()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"router".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public List<DruidK8sHistoricalComponent> getHistoricals()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"historical".equals(service.getDruidServiceType()))
+                        .map(service -> (DruidK8sHistoricalComponent) service)
+                        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get all allocated ports for K3S container exposure.
+   */
+  public int[] getAllocatedPorts()
+  {
+    return portAllocator.getAllocatedPorts();
+  }
+
+  /**
+   * Get service to port mapping for debugging.
+   */
+  public Map<String, Integer> getServicePortMapping()
+  {
+    return portAllocator.getServicePortMapping();
+  }
+
+  public Optional<String> getBrokerUrl()
+  {
+    return getBroker().map(DruidK8sComponent::getServiceUrl);
+  }
+
+  public Optional<String> getRouterUrl()
+  {
+    return getRouter().map(DruidK8sComponent::getServiceUrl);
+  }
+
+  /**
+   * Get external coordinator URL for test connectivity.
+   */
+  public Optional<String> getCoordinatorExternalUrl()
+  {
+    return getCoordinator().map(coordinator -> 
coordinator.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Get external broker URL for test connectivity.
+   */
+  public Optional<String> getBrokerExternalUrl()
+  {
+    return getBroker().map(broker -> 
broker.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Get external router URL for test connectivity.
+   */
+  public Optional<String> getRouterExternalUrl(KubernetesClient client)
+  {
+    return getRouter().map(router -> 
router.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Submits a task to the Druid cluster.
+   *
+   * @return the task ID
+   */
+  public String submitTask(String taskJson) throws Exception
+  {
+    Optional<String> coordinatorUrl = getCoordinatorExternalUrl();
+
+    if (coordinatorUrl.isEmpty()) {
+      throw new AssertionError("Coordinator URL not found");
+    }
+    String taskSubmissionUrl = coordinatorUrl.get() + "/druid/indexer/v1/task";
+
+    HttpClient httpClient = HttpClient.newBuilder()
+                                      .connectTimeout(Duration.ofSeconds(30))
+                                      .build();
+
+    HttpRequest taskRequest = HttpRequest.newBuilder()
+                                         .uri(URI.create(taskSubmissionUrl))
+                                         .header("Content-Type", 
"application/json")
+                                         .header("Accept", "application/json")
+                                         
.POST(HttpRequest.BodyPublishers.ofString(taskJson))
+                                         .timeout(Duration.ofSeconds(30))
+                                         .build();
+
+    HttpResponse<String> taskResponse = httpClient.send(taskRequest, 
HttpResponse.BodyHandlers.ofString());
+    Assertions.assertEquals(200, taskResponse.statusCode());
+
+    String responseBody = taskResponse.body();
+    String taskId = null;
+    String[] parts = responseBody.split("\"task\":\\s*\"");
+    if (parts.length > 1) {
+      String afterTask = parts[1];
+      taskId = afterTask.split("\"")[0];
+    }
+
+    Assertions.assertNotNull(taskId, "Should be able to extract task ID from 
response");
+    return taskId;
+  }
+
+  /**
+   * Submits a load rule task to the Druid cluster.
+   *
+   * @return the task ID
+   */
+  public void setLoadRule(String datasource, String tier, int replicants) 
throws Exception
+  {
+    Optional<String> coordinatorUrl = getCoordinatorExternalUrl();
+
+    if (coordinatorUrl.isEmpty()) {
+      throw new AssertionError("Coordinator URL not found");
+    }
+    String rulesUrl = coordinatorUrl.get() + "/druid/coordinator/v1/rules/" + 
datasource;
+
+    String ruleJson = StringUtils.format(
+        "[{\"type\":\"loadForever\",\"tieredReplicants\":{\"%s\":%d}}]",
+        tier, replicants
+    );
+
+    HttpClient httpClient = HttpClient.newBuilder()
+                                      .connectTimeout(Duration.ofSeconds(30))
+                                      .build();
+
+    HttpRequest request = HttpRequest.newBuilder()
+                                     .uri(URI.create(rulesUrl))
+                                     .header("Content-Type", 
"application/json")
+                                     .header("Accept", "application/json")
+                                     
.POST(HttpRequest.BodyPublishers.ofString(ruleJson))
+                                     .timeout(Duration.ofSeconds(30))
+                                     .build();
+
+    HttpResponse<String> response = httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+    Assertions.assertEquals(200, response.statusCode(), "Failed to set load 
rule: " + response.body());
+  }
+
+  private void applyRBACManifests(KubernetesClient client)
+  {
+    try {
+      client.load(new 
FileInputStream(Resources.getFileForResource(RBAC_MANIFEST_PATH)))

Review Comment:
   ## Potential input resource leak
   
   This FileInputStream is not always closed on method exit.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10243)



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/kubernetes/DruidOperatorComponent.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.kubernetes;
+
+import io.fabric8.kubernetes.api.model.NamespaceBuilder;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.simulate.K3SResource;
+import org.testcontainers.k3s.K3sContainer;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Component for deploying the DataInfraHQ Druid Operator using Helm chart
+ * in a Kubernetes test environment.
+ */
+public class DruidOperatorComponent implements K8sComponent
+{
+  private static final Logger log = new Logger(DruidOperatorComponent.class);
+
+  private static final String COMPONENT_NAME = "DruidOperator";
+  private static final String OPERATOR_NAMESPACE = "druid-operator-system";
+  private static final String HELM_RELEASE_NAME = "druid-operator";
+  private static final String HELM_REPO_NAME = "datainfra";
+  private static final String HELM_REPO_URL = "https://charts.datainfra.io";;
+  private static final String HELM_CHART_NAME = "datainfra/druid-operator";
+  private static final int POD_READY_CHECK_TIMEOUT_SECONDS = 180;
+
+  private final String druidNamespace;
+  private K3SResource k3sResource;
+
+
+  public DruidOperatorComponent(String druidNamespace)
+  {
+    this.druidNamespace = druidNamespace;
+  }
+
+  public void setK3SResource(K3SResource k3sResource)
+  {
+    this.k3sResource = k3sResource;
+  }
+
+  @Override
+  public void initialize(KubernetesClient client) throws Exception
+  {
+    log.info("Initializing Helm-based Druid Operator in namespace: %s", 
OPERATOR_NAMESPACE);
+
+    createNamespace(client, OPERATOR_NAMESPACE);
+    createNamespace(client, druidNamespace);
+
+    try {
+      log.info("Adding DataInfraHQ Helm repository...");
+      executeHelmCommand("repo", "add", HELM_REPO_NAME, HELM_REPO_URL);
+
+      log.info("Updating Helm repositories...");
+      executeHelmCommand("repo", "update");
+
+      log.info("Installing Druid Operator via Helm...");
+      executeHelmCommand(
+          "install",
+          HELM_RELEASE_NAME,
+          HELM_CHART_NAME,
+          "--namespace", OPERATOR_NAMESPACE,
+          "--create-namespace",
+          "--set", "env.WATCH_NAMESPACE=" + druidNamespace,
+          "--wait",
+          "--timeout", "3m"
+      );
+
+      log.info("✓ Helm installation completed successfully");
+
+    }
+    catch (Exception e) {
+      log.error("Failed to install Druid Operator via Helm: %s", 
e.getMessage());
+      throw new RuntimeException("Helm installation failed", e);
+    }
+  }
+
+  @Override
+  public void waitUntilReady(KubernetesClient client) throws Exception
+  {
+    try {
+      log.info("Waiting for Druid Operator deployment to be ready...");
+
+      String deploymentName = HELM_RELEASE_NAME;
+
+      client.apps().deployments()
+            .inNamespace(OPERATOR_NAMESPACE)
+            .withName(deploymentName)
+            .waitUntilReady(POD_READY_CHECK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+      log.info("✓ Druid Operator is ready");
+
+    }
+    catch (Exception e) {
+      log.error("Timeout waiting for %s to be ready", getComponentName());
+      throw e;
+    }
+  }
+
+  @Override
+  public void cleanup(KubernetesClient client)
+  {
+    try {
+      log.info("Cleaning up Helm-based Druid Operator...");
+
+      executeHelmCommand(
+          "uninstall",
+          HELM_RELEASE_NAME,
+          "--namespace", OPERATOR_NAMESPACE
+      );
+
+      client.namespaces().withName(OPERATOR_NAMESPACE).delete();
+
+      log.info("✓ Cleanup completed");
+
+    }
+    catch (Exception e) {
+      log.error("Error during %s cleanup: %s", getComponentName(), 
e.getMessage());
+    }
+  }
+
+  @Override
+  public String getComponentName()
+  {
+    return COMPONENT_NAME;
+  }
+
+  @Override
+  public String getNamespace()
+  {
+    return OPERATOR_NAMESPACE;
+  }
+
+  /**
+   * Execute helm command in the K3s container
+   */
+  private void executeHelmCommand(String... args) throws Exception
+  {
+    log.info("Executing helm command: helm %s", String.join(" ", args));
+
+    K3sContainer k3sContainer = k3sResource.getK3sContainer();
+
+    String[] fullCommand = new String[args.length + 3];
+    fullCommand[0] = "sh";
+    fullCommand[1] = "-c";
+    fullCommand[2] = "export KUBECONFIG=/etc/rancher/k3s/k3s.yaml && helm " + 
String.join(" ", args);
+
+    try {
+      var result = k3sContainer.execInContainer(fullCommand);
+
+      if (result.getExitCode() == 0) {
+        log.info("✓ Helm command succeeded");
+        if (!result.getStdout().trim().isEmpty()) {
+          log.info("Output: %s", result.getStdout().trim());
+        }
+      } else {
+        log.error("✗ Helm command failed with exit code: %d", 
result.getExitCode());
+        if (!result.getStderr().trim().isEmpty()) {
+          log.error("Error: %s", result.getStderr().trim());
+        }
+        throw new RuntimeException("Helm command failed: " + String.join(" ", 
args));
+      }
+    }
+    catch (Exception e) {
+      log.error("Exception executing helm command: %s", e.getMessage());
+      throw e;
+    }
+  }
+
+  private void createNamespace(KubernetesClient client, String namespace)
+  {
+    try {
+      client.namespaces().resource(new NamespaceBuilder()
+                                       .withNewMetadata()
+                                       .withName(namespace)
+                                       .endMetadata()
+                                       .build()).createOrReplace();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CreateOrReplaceable.createOrReplace](1) should be avoided because 
it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10241)



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/kubernetes/DruidClusterComponent.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.kubernetes;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.k8s.simulate.K3SResource;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.utils.PortAllocator;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.FileInputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Component that orchestrates a complete Druid cluster deployment.
+ * Manages all Druid services and their dependencies.
+ */
+public class DruidClusterComponent implements K8sComponent
+{
+  private static final Logger log = new Logger(DruidClusterComponent.class);
+
+  private static final String COMPONENT_NAME = "DruidCluster";
+  private static final String RBAC_MANIFEST_PATH = 
"data/manifests/druid-common/rbac.yaml";
+
+  private final String namespace;
+  private final String druidImage;
+  private final String clusterName;
+  private final List<DruidK8sComponent> druidServices = new ArrayList<>();
+  private K3SResource k3sResource;
+  private final PortAllocator portAllocator;
+
+  public DruidClusterComponent(String namespace, String druidImage, String 
clusterName, K3SResource k3sResource)
+  {
+    this.namespace = namespace;
+    this.druidImage = druidImage;
+    this.clusterName = clusterName;
+    this.k3sResource = k3sResource;
+    this.portAllocator = new PortAllocator(30080, 30100);
+  }
+
+
+  public void addDruidService(DruidK8sComponent service)
+  {
+    String serviceKey = service.getPodLabel();
+    int allocatedPort = portAllocator.allocatePort(serviceKey);
+    service.setAllocatedNodePort(allocatedPort);
+
+    // Share the TestFolder from K3SResource with the DruidK8sComponent
+    if (k3sResource != null && k3sResource.getTestFolder() != null) {
+      service.setTestFolder(k3sResource.getTestFolder());
+    }
+
+    druidServices.add(service);
+    log.info(
+        "Added Druid service %s with allocated NodePort %d",
+        service.getDruidServiceType(), allocatedPort
+    );
+  }
+
+  @Override
+  public void initialize(KubernetesClient client) throws Exception
+  {
+    log.info("Initializing %s...", getComponentName());
+    applyRBACManifests(client);
+    applyDruidClusterManifest(client);
+    log.info("%s initialization completed", getComponentName());
+  }
+
+  @Override
+  public void waitUntilReady(KubernetesClient client) throws Exception
+  {
+    log.info("Waiting for %s to be ready...", getComponentName());
+
+    log.info("Allowing 2 seconds for all Druid services to start and discover 
each other...");
+    Thread.sleep(2000);
+
+    for (DruidK8sComponent service : druidServices) {
+      log.info("Waiting for Druid %s to be ready...", 
service.getDruidServiceType());
+      service.waitUntilReady(client);
+    }
+
+    log.info("%s is ready - all services are healthy!", getComponentName());
+  }
+
+
+  @Override
+  public void cleanup(KubernetesClient client)
+  {
+    log.info("Cleaning up %s...", getComponentName());
+
+    for (DruidK8sComponent service : druidServices) {
+      try {
+        service.cleanup(client);
+        String serviceKey = service.getDruidServiceType() + "-" + clusterName;
+        portAllocator.releasePort(serviceKey);
+      }
+      catch (Exception e) {
+        log.error("Error cleaning up %s: %s", service.getDruidServiceType(), 
e.getMessage());
+      }
+    }
+
+    cleanupRBACResources(client);
+    log.info("%s cleanup completed", getComponentName());
+  }
+
+  @Override
+  public String getComponentName()
+  {
+    return COMPONENT_NAME;
+  }
+
+  @Override
+  public String getNamespace()
+  {
+    return namespace;
+  }
+
+  public List<DruidK8sComponent> getDruidServices()
+  {
+    return new ArrayList<>(druidServices);
+  }
+
+  public Optional<DruidK8sComponent> getCoordinator()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"coordinator".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public Optional<DruidK8sComponent> getBroker()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"broker".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public Optional<DruidK8sComponent> getRouter()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"router".equals(service.getDruidServiceType()))
+                        .findFirst();
+  }
+
+  public List<DruidK8sHistoricalComponent> getHistoricals()
+  {
+    return druidServices.stream()
+                        .filter(service -> 
"historical".equals(service.getDruidServiceType()))
+                        .map(service -> (DruidK8sHistoricalComponent) service)
+                        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get all allocated ports for K3S container exposure.
+   */
+  public int[] getAllocatedPorts()
+  {
+    return portAllocator.getAllocatedPorts();
+  }
+
+  /**
+   * Get service to port mapping for debugging.
+   */
+  public Map<String, Integer> getServicePortMapping()
+  {
+    return portAllocator.getServicePortMapping();
+  }
+
+  public Optional<String> getBrokerUrl()
+  {
+    return getBroker().map(DruidK8sComponent::getServiceUrl);
+  }
+
+  public Optional<String> getRouterUrl()
+  {
+    return getRouter().map(DruidK8sComponent::getServiceUrl);
+  }
+
+  /**
+   * Get external coordinator URL for test connectivity.
+   */
+  public Optional<String> getCoordinatorExternalUrl()
+  {
+    return getCoordinator().map(coordinator -> 
coordinator.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Get external broker URL for test connectivity.
+   */
+  public Optional<String> getBrokerExternalUrl()
+  {
+    return getBroker().map(broker -> 
broker.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Get external router URL for test connectivity.
+   */
+  public Optional<String> getRouterExternalUrl(KubernetesClient client)
+  {
+    return getRouter().map(router -> 
router.getExternalUrl(k3sResource.getK3sContainer()));
+  }
+
+  /**
+   * Submits a task to the Druid cluster.
+   *
+   * @return the task ID
+   */
+  public String submitTask(String taskJson) throws Exception
+  {
+    Optional<String> coordinatorUrl = getCoordinatorExternalUrl();
+
+    if (coordinatorUrl.isEmpty()) {
+      throw new AssertionError("Coordinator URL not found");
+    }
+    String taskSubmissionUrl = coordinatorUrl.get() + "/druid/indexer/v1/task";
+
+    HttpClient httpClient = HttpClient.newBuilder()
+                                      .connectTimeout(Duration.ofSeconds(30))
+                                      .build();
+
+    HttpRequest taskRequest = HttpRequest.newBuilder()
+                                         .uri(URI.create(taskSubmissionUrl))
+                                         .header("Content-Type", 
"application/json")
+                                         .header("Accept", "application/json")
+                                         
.POST(HttpRequest.BodyPublishers.ofString(taskJson))
+                                         .timeout(Duration.ofSeconds(30))
+                                         .build();
+
+    HttpResponse<String> taskResponse = httpClient.send(taskRequest, 
HttpResponse.BodyHandlers.ofString());
+    Assertions.assertEquals(200, taskResponse.statusCode());
+
+    String responseBody = taskResponse.body();
+    String taskId = null;
+    String[] parts = responseBody.split("\"task\":\\s*\"");
+    if (parts.length > 1) {
+      String afterTask = parts[1];
+      taskId = afterTask.split("\"")[0];
+    }
+
+    Assertions.assertNotNull(taskId, "Should be able to extract task ID from 
response");
+    return taskId;
+  }
+
+  /**
+   * Submits a load rule task to the Druid cluster.
+   *
+   * @return the task ID
+   */
+  public void setLoadRule(String datasource, String tier, int replicants) 
throws Exception
+  {
+    Optional<String> coordinatorUrl = getCoordinatorExternalUrl();
+
+    if (coordinatorUrl.isEmpty()) {
+      throw new AssertionError("Coordinator URL not found");
+    }
+    String rulesUrl = coordinatorUrl.get() + "/druid/coordinator/v1/rules/" + 
datasource;
+
+    String ruleJson = StringUtils.format(
+        "[{\"type\":\"loadForever\",\"tieredReplicants\":{\"%s\":%d}}]",
+        tier, replicants
+    );
+
+    HttpClient httpClient = HttpClient.newBuilder()
+                                      .connectTimeout(Duration.ofSeconds(30))
+                                      .build();
+
+    HttpRequest request = HttpRequest.newBuilder()
+                                     .uri(URI.create(rulesUrl))
+                                     .header("Content-Type", 
"application/json")
+                                     .header("Accept", "application/json")
+                                     
.POST(HttpRequest.BodyPublishers.ofString(ruleJson))
+                                     .timeout(Duration.ofSeconds(30))
+                                     .build();
+
+    HttpResponse<String> response = httpClient.send(request, 
HttpResponse.BodyHandlers.ofString());
+    Assertions.assertEquals(200, response.statusCode(), "Failed to set load 
rule: " + response.body());
+  }
+
+  private void applyRBACManifests(KubernetesClient client)
+  {
+    try {
+      client.load(new 
FileInputStream(Resources.getFileForResource(RBAC_MANIFEST_PATH)))
+            .inNamespace(namespace)
+            .createOrReplace();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CreateOrReplaceable.createOrReplace](1) should be avoided because 
it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10240)



##########
extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/simulate/K3SResource.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.simulate;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.core.DefaultDockerClientConfig;
+import com.github.dockerjava.core.DockerClientImpl;
+import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.TestFolder;
+import org.apache.druid.testing.embedded.TestcontainerResource;
+import org.testcontainers.k3s.K3sContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Duration;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+import static org.testcontainers.containers.BindMode.READ_WRITE;
+
+/**
+ * A K3s container for use in embedded tests.
+ */
+public class K3SResource extends TestcontainerResource<K3sContainer>
+{
+  private static final Logger log = new Logger(K3SResource.class);
+
+  private static final String K3S_IMAGE = "rancher/k3s:v1.28.8-k3s1";
+  private static final String HELM_VERSION = "v3.13.1";
+  private static final String HELM_PLATFORM = "linux-amd64";
+  
+  private EmbeddedDruidCluster cluster;
+  private KubernetesClient client;
+  private TestFolder testFolder;
+
+  @Override
+  public void beforeStart(EmbeddedDruidCluster cluster)
+  {
+    this.cluster = cluster;
+    this.testFolder = new TestFolder();
+    try {
+      this.testFolder.start();
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Failed to initialize TestFolder", e);
+    }
+  }
+
+
+  @Override
+  protected K3sContainer createContainer()
+  {
+    try {
+      if (testFolder == null) {
+        testFolder = new TestFolder();
+        testFolder.start();
+      }
+      
+      File helmBinary = downloadHelm();
+      
+      File storageDir = testFolder.getOrCreateFolder("druid-storage");
+      testFolder.getOrCreateFolder("druid-storage/segments");
+      testFolder.getOrCreateFolder("druid-storage/segment-cache");
+      testFolder.getOrCreateFolder("druid-storage/metadata");
+      testFolder.getOrCreateFolder("druid-storage/indexing-logs");
+      
+      Integer[] exposedPortRange = generatePortRange(30080, 30100);
+      Integer[] allPorts = new Integer[exposedPortRange.length + 1];
+      allPorts[0] = 6443;
+      System.arraycopy(exposedPortRange, 0, allPorts, 1, 
exposedPortRange.length);
+      
+      K3sContainer container = new 
K3sContainer(DockerImageName.parse(K3S_IMAGE))
+          .withExposedPorts(allPorts)
+          .withCopyFileToContainer(
+              MountableFile.forHostPath(helmBinary.getAbsolutePath()),
+              "/usr/local/bin/helm"
+          )
+          .withFileSystemBind(
+              storageDir.getAbsolutePath(),
+              "/druid/data",
+              READ_WRITE
+          )
+          .withCreateContainerCmdModifier(cmd -> {
+            cmd.withPlatform("linux/amd64");
+          });
+      container.start();
+      
+      container.execInContainer("chmod", "+x", "/usr/local/bin/helm");
+      
+      try {
+        container.execInContainer("helm", "version");
+      }
+      catch (IOException | InterruptedException e) {
+        throw new RuntimeException("Helm verification failed", e);
+      }
+
+      this.client = new KubernetesClientBuilder()
+          .withConfig(Config.fromKubeconfig(container.getKubeConfigYaml()))
+          .build();
+      return container;
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Failed to create K3S container with Helm", 
e);
+    }
+  }
+
+  @Override
+  public void stop()
+  {
+    try {
+      super.stop();
+    }
+    finally {
+      if (testFolder != null) {
+        try {
+          testFolder.stop();
+        }
+        catch (Exception e) {
+          log.error("Failed to cleanup TestFolder: " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * Download Helm binary and save to TestFolder.
+   */
+  private File downloadHelm() throws Exception
+  {
+    String helmUrl = StringUtils.format(
+        "https://get.helm.sh/helm-%s-%s.tar.gz";,
+        HELM_VERSION,
+        HELM_PLATFORM
+    );
+    log.debug("Downloading Helm from: %s", helmUrl);
+
+    File helmFolder = testFolder.getOrCreateFolder("helm");
+    File tarFile = new File(helmFolder, "helm.tar.gz");
+    File helmBinary = new File(helmFolder, "helm");
+    
+    if (helmBinary.exists() && helmBinary.canExecute()) {
+      log.debug("Helm binary already exists: %s", 
helmBinary.getAbsolutePath());
+      return helmBinary;
+    }
+    
+    HttpClient client = HttpClient.newBuilder()
+        .connectTimeout(Duration.ofSeconds(30))
+        .build();
+        
+    HttpRequest request = HttpRequest.newBuilder()
+        .uri(URI.create(helmUrl))
+        .timeout(Duration.ofSeconds(120))
+        .build();
+    
+    HttpResponse<InputStream> response = client.send(request, 
HttpResponse.BodyHandlers.ofInputStream());
+    
+    if (response.statusCode() != 200) {
+      throw new RuntimeException("Failed to download Helm. Status: " + 
response.statusCode());
+    }
+    
+    try (InputStream inputStream = response.body();
+         FileOutputStream outputStream = new FileOutputStream(tarFile)) {
+      inputStream.transferTo(outputStream);
+    }
+
+    extractTarGz(tarFile, helmFolder, HELM_PLATFORM + "/helm", "helm");
+    
+    Set<PosixFilePermission> permissions = Set.of(
+        PosixFilePermission.OWNER_READ,
+        PosixFilePermission.OWNER_WRITE,
+        PosixFilePermission.OWNER_EXECUTE,
+        PosixFilePermission.GROUP_READ,
+        PosixFilePermission.GROUP_EXECUTE,
+        PosixFilePermission.OTHERS_READ,
+        PosixFilePermission.OTHERS_EXECUTE
+    );
+    
+    try {
+      Files.setPosixFilePermissions(helmBinary.toPath(), permissions);
+    }
+    catch (IOException e) {
+      helmBinary.setExecutable(true);
+    }
+    
+    tarFile.delete();
+    log.info("Helm binary downloaded and extracted to: %s", 
helmBinary.getAbsolutePath());
+    return helmBinary;
+  }
+
+  /**
+   * Extract a specific file from a tar.gz archive.
+   */
+  private void extractTarGz(File tarGzFile, File destFolder, String 
sourceEntryPath, String destFileName) throws IOException
+  {
+    try (FileInputStream fis = new FileInputStream(tarGzFile);
+         BufferedInputStream bis = new BufferedInputStream(fis);
+         GZIPInputStream gis = new GZIPInputStream(bis);
+         TarArchiveInputStream tais = new TarArchiveInputStream(gis)) {
+      
+      TarArchiveEntry entry;
+      while ((entry = tais.getNextTarEntry()) != null) {

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [TarArchiveInputStream.getNextTarEntry](1) should be avoided 
because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/10242)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to