This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new aa7a65676b8 Add embedded test for kubernetes task runner using Druid 
operator (#18413)
aa7a65676b8 is described below

commit aa7a65676b890116f02d703bf46f20c77b084ba2
Author: Uddeshya Singh <[email protected]>
AuthorDate: Sun Aug 24 23:21:54 2025 +0530

    Add embedded test for kubernetes task runner using Druid operator (#18413)
    
    Changes:
    - Add `K3sClusterResourceWithOperator`
    - Add `KubernetesClusterWithOperatorDockerTest`
    - Add operator specific manifest template
---
 embedded-tests/pom.xml                             |  16 ++
 .../testing/embedded/k8s/K3sClusterResource.java   |  96 ++++++--
 .../k8s/K3sClusterWithOperatorResource.java        | 271 +++++++++++++++++++++
 .../testing/embedded/k8s/K3sDruidService.java      |  28 ++-
 .../embedded/k8s/KubernetesClusterDockerTest.java  |   2 +-
 ...> KubernetesClusterWithOperatorDockerTest.java} |  35 ++-
 .../manifests/druid-operator-namespace.yaml        |   4 +
 .../resources/manifests/druid-operator-rbac.yaml   |  30 +++
 .../manifests/druid-service-with-operator.yaml     | 114 +++++++++
 .../kubernetes-overlord-extensions/pom.xml         |   1 -
 .../druid/indexing/common/task/AbstractTask.java   |   2 +-
 11 files changed, 553 insertions(+), 46 deletions(-)

diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index a76849f02ef..f464b3a9ff2 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -142,6 +142,10 @@
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
@@ -399,6 +403,18 @@
       <version>6.2.12</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-compress</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.kubernetes</groupId>
+      <artifactId>client-java-api</artifactId>
+      <version>19.0.0</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
index 8a28398b751..13c9201976b 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
@@ -73,6 +73,7 @@ public class K3sClusterResource extends 
TestcontainerResource<K3sContainer>
 
   public static final String DRUID_NAMESPACE = "druid";
   private static final String NAMESPACE_MANIFEST = 
"manifests/druid-namespace.yaml";
+  private static final String DEFAULT_SERVICE_MANIFEST = 
"manifests/druid-service.yaml";
 
   private static final String COMMON_CONFIG_MAP = "druid-common-props";
   private static final String SERVICE_CONFIG_MAP = "druid-%s-props";
@@ -84,13 +85,20 @@ public class K3sClusterResource extends 
TestcontainerResource<K3sContainer>
 
   private KubernetesClient client;
   private String druidImageName;
+  private String druidManifestTemplate = DEFAULT_SERVICE_MANIFEST;
 
   private final Closer closer = Closer.create();
 
   public K3sClusterResource()
   {
     // Add the namespace manifest
-    manifestFiles.add(Resources.getFileForResource(NAMESPACE_MANIFEST));
+    addManifestResource(NAMESPACE_MANIFEST);
+  }
+
+  public K3sClusterResource addManifestResource(String manifestResourceName)
+  {
+    manifestFiles.add(Resources.getFileForResource(manifestResourceName));
+    return this;
   }
 
   public K3sClusterResource addService(K3sDruidService service)
@@ -109,11 +117,20 @@ public class K3sClusterResource extends 
TestcontainerResource<K3sContainer>
    * Uses the Docker test image specified by the system property
    * {@link DruidContainerResource#PROPERTY_TEST_IMAGE} for the Druid pods.
    */
-  public K3sClusterResource usingTestImage()
+  public K3sClusterResource usingDruidTestImage()
   {
     return usingDruidImage(DruidContainerResource.getTestDruidImageName());
   }
 
+  /**
+   * Uses the given resource template to create manifests for Druid services.
+   */
+  public K3sClusterResource usingDruidManifestTemplate(String resourceName)
+  {
+    this.druidManifestTemplate = resourceName;
+    return this;
+  }
+
   @Override
   protected K3sContainer createContainer()
   {
@@ -129,6 +146,9 @@ public class K3sClusterResource extends 
TestcontainerResource<K3sContainer>
         // Druid service is discoverable with the Druid service discovery
         portBindings.add(port + ":" + port);
       }
+      int servicePort = service.getServicePort();
+      container.addExposedPorts(servicePort);
+      portBindings.add(servicePort + ":" + servicePort);
     }
 
     container.setPortBindings(portBindings);
@@ -138,37 +158,59 @@ public class K3sClusterResource extends 
TestcontainerResource<K3sContainer>
   @Override
   public void onStarted(EmbeddedDruidCluster cluster)
   {
-    client = new KubernetesClientBuilder()
-        .withConfig(Config.fromKubeconfig(getContainer().getKubeConfigYaml()))
-        .build();
-    closer.register(client);
-
+    initKubernetesClient();
     loadLocalDockerImageIntoContainer(druidImageName, cluster.getTestFolder());
-
     manifestFiles.forEach(this::applyManifest);
+    initializeDruidServices(cluster);
+    waitUntilPodsAreReady(DRUID_NAMESPACE);
+    waitUntilServicesAreHealthy();
+  }
 
-    // Create common config map
-    final Properties commonProperties = new Properties();
-    commonProperties.putAll(cluster.getCommonProperties());
-    commonProperties.remove("druid.extensions.modulesForEmbeddedTests");
-    applyConfigMap(
-        newConfigMap(COMMON_CONFIG_MAP, commonProperties, 
"common.runtime.properties")
-    );
-
-    // Create config maps and manifests for each service
+  protected void initializeDruidServices(EmbeddedDruidCluster cluster)
+  {
+    createConfigMapForCommonProperties(cluster);
     for (K3sDruidService druidService : services) {
       final String serviceConfigMap = StringUtils.format(SERVICE_CONFIG_MAP, 
druidService.getName());
       applyConfigMap(
           newConfigMap(serviceConfigMap, druidService.getProperties(), 
"runtime.properties")
       );
-      applyManifest(druidService);
+      applyManifestYaml(druidService, createManifestYaml(druidService));
     }
+  }
 
-    // Wait for all pods to be ready and services to be healthy
-    
client.pods().inNamespace(DRUID_NAMESPACE).resources().forEach(this::waitUntilPodIsReady);
+  protected void waitUntilServicesAreHealthy()
+  {
     services.forEach(this::waitUntilServiceIsHealthy);
   }
 
+  protected List<K3sDruidService> getServices()
+  {
+    return services;
+  }
+
+  private void createConfigMapForCommonProperties(EmbeddedDruidCluster cluster)
+  {
+    final Properties commonProperties = new Properties();
+    commonProperties.putAll(cluster.getCommonProperties());
+    commonProperties.remove("druid.extensions.modulesForEmbeddedTests");
+    applyConfigMap(
+        newConfigMap(COMMON_CONFIG_MAP, commonProperties, 
"common.runtime.properties")
+    );
+  }
+
+  private void initKubernetesClient()
+  {
+    client = new KubernetesClientBuilder()
+        .withConfig(Config.fromKubeconfig(getContainer().getKubeConfigYaml()))
+        .build();
+    closer.register(client);
+  }
+
+  protected void waitUntilPodsAreReady(String namespace)
+  {
+    
client.pods().inNamespace(namespace).resources().forEach(this::waitUntilPodIsReady);
+  }
+
   @Override
   public void stop()
   {
@@ -251,13 +293,19 @@ public class K3sClusterResource extends 
TestcontainerResource<K3sContainer>
   }
 
   /**
-   * Creates and applies the manifest for the given Druid service.
+   * Creates the manifest YAML for the given Druid service.
    */
-  private void applyManifest(K3sDruidService service)
+  protected String createManifestYaml(K3sDruidService service)
   {
-    final String manifestYaml = service.createManifestYaml(druidImageName);
-    log.info("Applying manifest for service[%s]: %s", service.getName(), 
manifestYaml);
+    return service.createManifestYaml(druidManifestTemplate, druidImageName);
+  }
 
+  /**
+   * Applies the given service manifest YAML to the K3s cluster.
+   */
+  protected void applyManifestYaml(K3sDruidService service, String 
manifestYaml)
+  {
+    log.info("Applying manifest for service[%s]: %s", service.getName(), 
manifestYaml);
     try (ByteArrayInputStream bis = new 
ByteArrayInputStream(manifestYaml.getBytes(StandardCharsets.UTF_8))) {
       client.load(bis).inNamespace(DRUID_NAMESPACE).serverSideApply();
       log.info("Applied manifest for service[%s]", service.getName());
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterWithOperatorResource.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterWithOperatorResource.java
new file mode 100644
index 00000000000..20af2365061
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterWithOperatorResource.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.k8s;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+public class K3sClusterWithOperatorResource extends K3sClusterResource
+{
+  private static final Logger log = new 
Logger(K3sClusterWithOperatorResource.class);
+  private static final String RBAC_MANIFEST = 
"manifests/druid-operator-rbac.yaml";
+  private static final String OPERATOR_NAMESPACE_MANIFEST = 
"manifests/druid-operator-namespace.yaml";
+  private static final String OPERATOR_NAMESPACE = "druid-operator-system";
+  private static final String HELM_RELEASE_NAME = "druid-operator";
+  private static final String HELM_REPO_NAME = "datainfra";
+  private static final String HELM_REPO_URL = "https://charts.datainfra.io";;
+  private static final String HELM_CHART_NAME = "datainfra/druid-operator";
+  private static final String HELM_VERSION = "v3.13.1";
+  private static final String HELM_PLATFORM = "linux-amd64";
+  private static final String HELM_MOUNT_PATH = "/usr/local/bin/helm";
+
+
+  public K3sClusterWithOperatorResource()
+  {
+    super();
+    addManifestResource(RBAC_MANIFEST);
+    addManifestResource(OPERATOR_NAMESPACE_MANIFEST);
+  }
+
+  @Override
+  protected void initializeDruidServices(EmbeddedDruidCluster cluster)
+  {
+    installHelm(cluster);
+    setupOperatorWithHelm();
+    waitUntilPodsAreReady(OPERATOR_NAMESPACE);
+    for (K3sDruidService druidService : getServices()) {
+      applyManifestYaml(druidService, createManifestYaml(druidService, 
cluster));
+    }
+  }
+
+  /**
+   * Installs Helm binary in the K3s cluster.
+   */
+  private void installHelm(EmbeddedDruidCluster cluster)
+  {
+    try {
+      File helmBinary = downloadHelmBinary(cluster);
+      this.getContainer().copyFileToContainer(
+          MountableFile.forHostPath(helmBinary.getAbsolutePath()),
+          HELM_MOUNT_PATH
+      );
+      this.getContainer().execInContainer("chmod", "+x", HELM_MOUNT_PATH);
+      log.info("Helm binary installed to /usr/local/bin/helm");
+    }
+    catch (Exception e) {
+      log.error(e, "Failed to download or install Helm binary");
+      throw new RuntimeException("Helm installation failed", e);
+    }
+  }
+
+  private File downloadHelmBinary(EmbeddedDruidCluster cluster) throws 
Exception
+  {
+    String helmUrl = StringUtils.format(
+        "https://get.helm.sh/helm-%s-%s.tar.gz";,
+        HELM_VERSION,
+        HELM_PLATFORM
+    );
+    log.info("Downloading Helm from URL[%s].", helmUrl);
+
+    File helmFolder = cluster.getTestFolder().getOrCreateFolder("helm");
+    File tarFile = new File(helmFolder, "helm.tar.gz");
+    File helmBinary = new File(helmFolder, "helm");
+
+    if (helmBinary.exists() && helmBinary.canExecute()) {
+      log.info("Helm binary already exists at path[%s]", 
helmBinary.getAbsolutePath());
+      return helmBinary;
+    }
+
+    HttpClient client = HttpClient.newBuilder()
+                                  .connectTimeout(Duration.ofSeconds(30))
+                                  .build();
+
+    HttpRequest request = HttpRequest.newBuilder()
+                                     .uri(URI.create(helmUrl))
+                                     .timeout(Duration.ofSeconds(120))
+                                     .build();
+
+    HttpResponse<InputStream> response = client.send(request, 
HttpResponse.BodyHandlers.ofInputStream());
+
+    if (response.statusCode() != 200) {
+      throw new RuntimeException("Failed to download Helm. Status: " + 
response.statusCode());
+    }
+
+    try (InputStream inputStream = response.body();
+         FileOutputStream outputStream = new FileOutputStream(tarFile)) {
+      inputStream.transferTo(outputStream);
+    }
+
+    extractTarGz(tarFile, helmFolder, HELM_PLATFORM + "/helm", "helm");
+
+    Set<PosixFilePermission> permissions = Set.of(
+        PosixFilePermission.OWNER_READ,
+        PosixFilePermission.OWNER_WRITE,
+        PosixFilePermission.OWNER_EXECUTE,
+        PosixFilePermission.GROUP_READ,
+        PosixFilePermission.GROUP_EXECUTE,
+        PosixFilePermission.OTHERS_READ,
+        PosixFilePermission.OTHERS_EXECUTE
+    );
+
+    try {
+      Files.setPosixFilePermissions(helmBinary.toPath(), permissions);
+    }
+    catch (IOException e) {
+      helmBinary.setExecutable(true);
+    }
+
+    tarFile.delete();
+    log.info("Helm binary downloaded and extracted to: %s", 
helmBinary.getAbsolutePath());
+    return helmBinary;
+  }
+
+  /**
+   * Extract a specific file from a tar.gz archive.
+   */
+  private void extractTarGz(File tarGzFile, File destFolder, String 
sourceEntryPath, String destFileName)
+      throws IOException
+  {
+    try (FileInputStream fis = new FileInputStream(tarGzFile);
+         BufferedInputStream bis = new BufferedInputStream(fis);
+         GZIPInputStream gis = new GZIPInputStream(bis);
+         TarArchiveInputStream tais = new TarArchiveInputStream(gis)) {
+
+      TarArchiveEntry entry;
+      while ((entry = tais.getNextTarEntry()) != null) {
+        if (entry.getName().equals(sourceEntryPath)) {
+          File destFile = new File(destFolder, destFileName);
+          try (FileOutputStream fos = new FileOutputStream(destFile)) {
+            byte[] buffer = new byte[8192];
+            int len;
+            while ((len = tais.read(buffer)) != -1) {
+              fos.write(buffer, 0, len);
+            }
+          }
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Updates helm repository and installs the druid operator chart in the K3s 
cluster.
+   */
+  private void setupOperatorWithHelm()
+  {
+    try {
+      executeHelmCommand("repo", "add", HELM_REPO_NAME, HELM_REPO_URL);
+      executeHelmCommand("repo", "update");
+      executeHelmCommand(
+          "install",
+          HELM_RELEASE_NAME,
+          HELM_CHART_NAME,
+          "--namespace", OPERATOR_NAMESPACE,
+          "--create-namespace",
+          "--set", "env.WATCH_NAMESPACE=" + DRUID_NAMESPACE,
+          "--wait",
+          "--timeout", "3m"
+      );
+    }
+    catch (Exception e) {
+      log.error("Failed to set up Druid Operator with Helm: %s", 
e.getMessage());
+      throw new RuntimeException("Failed to execute helm command", e);
+    }
+  }
+
+  /**
+   * Executes a Helm command in the K3s cluster container.
+   */
+  private void executeHelmCommand(String... args) throws Exception
+  {
+    String[] fullCommand = new String[args.length + 3];
+    fullCommand[0] = "sh";
+    fullCommand[1] = "-c";
+    fullCommand[2] = "export KUBECONFIG=/etc/rancher/k3s/k3s.yaml && helm " + 
String.join(" ", args);
+
+    try {
+      log.info("Executing command[%s]", 
org.apache.commons.lang3.StringUtils.join(fullCommand, " "));
+      Container.ExecResult result = 
getContainer().execInContainer(fullCommand);
+
+      if (result.getExitCode() != 0) {
+        log.error("Helm command failed with exit code: %d", 
result.getExitCode());
+        if (!result.getStderr().trim().isEmpty()) {
+          log.error("Error: %s", result.getStderr().trim());
+        }
+        throw new RuntimeException("Helm command failed: " + String.join(" ", 
args));
+      }
+    }
+    catch (Exception e) {
+      log.error("Exception executing helm command: %s", e.getMessage());
+      throw e;
+    }
+  }
+
+  private String createManifestYaml(K3sDruidService service, 
EmbeddedDruidCluster cluster)
+  {
+    String manifestYaml = super.createManifestYaml(service);
+    manifestYaml = StringUtils.replace(
+        manifestYaml,
+        "${commonRuntimeProperties}",
+        buildPropertiesString(cluster.getCommonProperties(), 4)
+    );
+    manifestYaml = StringUtils.replace(
+        manifestYaml,
+        "${nodeRuntimeProperties}",
+        buildPropertiesString(service.getRuntimeProperties(), 8)
+    );
+    return manifestYaml;
+  }
+
+  /**
+   * Builds a properties string to be used in the manifest.yaml file 
supporting a uniform indentation.
+   */
+  private static String buildPropertiesString(Properties properties, int 
indentationSpaces)
+  {
+    final StringBuilder builder = new StringBuilder();
+    String indentation = " ".repeat(indentationSpaces);
+    for (String key : properties.stringPropertyNames()) {
+      
builder.append(indentation).append(key).append("=").append(properties.getProperty(key)).append("\n");
+    }
+    return builder.toString();
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
index 774beb69a27..9a78d9accca 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
@@ -36,20 +36,36 @@ import java.util.Properties;
  */
 public class K3sDruidService
 {
-  private static final String MANIFEST_TEMPLATE = 
"manifests/druid-service.yaml";
-
   private final DruidCommand command;
   private final Properties properties;
+  private int servicePort;
 
   public K3sDruidService(DruidCommand command)
   {
     this.command = command;
     this.properties = new Properties();
+    this.servicePort = command.getExposedPorts()[0];
 
     addProperty("druid.host", EmbeddedHostname.containerFriendly().toString());
     command.getDefaultProperties().forEach(properties::setProperty);
   }
 
+  public K3sDruidService usingPort(int port)
+  {
+    this.servicePort = port;
+    return this;
+  }
+
+  public int getServicePort()
+  {
+    return servicePort;
+  }
+
+  public Properties getRuntimeProperties()
+  {
+    return properties;
+  }
+
   public String getName()
   {
     return command.getName().toLowerCase(Locale.ROOT);
@@ -63,16 +79,16 @@ public class K3sDruidService
   /**
    * Creates a manifest YAML String for this service.
    */
-  public String createManifestYaml(String druidImage)
+  public String createManifestYaml(String manifestTemplateResource, String 
druidImage)
   {
     try {
       final String template = Files.readString(
-          Resources.getFileForResource(MANIFEST_TEMPLATE).toPath()
+          Resources.getFileForResource(manifestTemplateResource).toPath()
       );
 
       String manifest = StringUtils.replace(template, "${service}", getName());
       manifest = StringUtils.replace(manifest, "${command}", 
command.getName());
-      manifest = StringUtils.replace(manifest, "${port}", 
String.valueOf(command.getExposedPorts()[0]));
+      manifest = StringUtils.replace(manifest, "${port}", 
String.valueOf(servicePort));
       manifest = StringUtils.replace(manifest, "${image}", druidImage);
       manifest = StringUtils.replace(manifest, "${serviceFolder}", 
getServicePropsFolder());
 
@@ -99,7 +115,7 @@ public class K3sDruidService
     return StringUtils.format(
         "http://%s:%s/status/health";,
         EmbeddedHostname.containerFriendly().toString(),
-        command.getExposedPorts()[0]
+        String.valueOf(servicePort)
     );
   }
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
index e7f618a8c13..c44796ca0f2 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
@@ -40,7 +40,7 @@ public class KubernetesClusterDockerTest extends 
IngestionSmokeTest implements L
 
     // Create a K3s cluster with all the required services
     final K3sClusterResource k3sCluster = new K3sClusterResource()
-        .usingTestImage()
+        .usingDruidTestImage()
         .addService(new K3sDruidService(DruidCommand.Server.COORDINATOR))
         .addService(new K3sDruidService(DruidCommand.Server.OVERLORD))
         .addService(new K3sDruidService(DruidCommand.Server.HISTORICAL))
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
similarity index 72%
copy from 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
copy to 
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
index e7f618a8c13..748dfd55041 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
@@ -28,40 +28,49 @@ import org.junit.jupiter.api.BeforeEach;
 
 /**
  * Runs some basic ingestion tests against latest image Druid containers 
running
- * on a K3s cluster.
+ * on a K3s cluster with druid-operator.
  */
-public class KubernetesClusterDockerTest extends IngestionSmokeTest implements 
LatestImageDockerTest
+public class KubernetesClusterWithOperatorDockerTest extends 
IngestionSmokeTest implements LatestImageDockerTest
 {
+  private static final String MANIFEST_TEMPLATE = 
"manifests/druid-service-with-operator.yaml";
+
   @Override
   protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
   {
     final K3sDruidService brokerService = new 
K3sDruidService(DruidCommand.Server.BROKER)
-        .addProperty("druid.sql.planner.metadataRefreshPeriod", "PT1s");
+        .addProperty("druid.sql.planner.metadataRefreshPeriod", "PT1s")
+        .usingPort(30082);
+
+    final K3sDruidService overlordService = new 
K3sDruidService(DruidCommand.Server.OVERLORD)
+        .addProperty("druid.indexer.runner.type", "k8s")
+        .addProperty("druid.indexer.runner.namespace", "druid")
+        .addProperty("druid.indexer.runner.capacity", "4")
+        .usingPort(30090);
 
-    // Create a K3s cluster with all the required services
-    final K3sClusterResource k3sCluster = new K3sClusterResource()
-        .usingTestImage()
-        .addService(new K3sDruidService(DruidCommand.Server.COORDINATOR))
-        .addService(new K3sDruidService(DruidCommand.Server.OVERLORD))
-        .addService(new K3sDruidService(DruidCommand.Server.HISTORICAL))
-        .addService(new K3sDruidService(DruidCommand.Server.MIDDLE_MANAGER))
-        .addService(new K3sDruidService(DruidCommand.Server.ROUTER))
+    final K3sClusterResource k3sCluster = new K3sClusterWithOperatorResource()
+        .usingDruidTestImage()
+        .usingDruidManifestTemplate(MANIFEST_TEMPLATE)
+        .addService(new 
K3sDruidService(DruidCommand.Server.COORDINATOR).usingPort(30081))
+        .addService(overlordService)
+        .addService(new 
K3sDruidService(DruidCommand.Server.HISTORICAL).usingPort(30083))
+        .addService(new 
K3sDruidService(DruidCommand.Server.ROUTER).usingPort(30088))
         .addService(brokerService);
 
     // Add an EmbeddedOverlord and EmbeddedBroker to use their client and 
mapper bindings.
     overlord.addProperty("druid.plaintextPort", "7090");
     broker.addProperty("druid.plaintextPort", "7082");
-
+    
     return cluster
         .useContainerFriendlyHostname()
         .addResource(k3sCluster)
         .addServer(overlord)
         .addServer(broker)
         .addServer(eventCollector)
+        .addCommonProperty("druid.indexer.task.encapsulatedTask", "true")
         .addCommonProperty(
             "druid.extensions.loadList",
             "[\"druid-s3-extensions\", \"druid-kafka-indexing-service\","
-            + "\"druid-multi-stage-query\", \"postgresql-metadata-storage\"]"
+            + "\"druid-multi-stage-query\", \"postgresql-metadata-storage\", 
\"druid-kubernetes-overlord-extensions\"]"
         );
   }
 
diff --git 
a/embedded-tests/src/test/resources/manifests/druid-operator-namespace.yaml 
b/embedded-tests/src/test/resources/manifests/druid-operator-namespace.yaml
new file mode 100644
index 00000000000..c81ce338670
--- /dev/null
+++ b/embedded-tests/src/test/resources/manifests/druid-operator-namespace.yaml
@@ -0,0 +1,4 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: druid-operator-system
diff --git 
a/embedded-tests/src/test/resources/manifests/druid-operator-rbac.yaml 
b/embedded-tests/src/test/resources/manifests/druid-operator-rbac.yaml
new file mode 100644
index 00000000000..fe0bf32a7fd
--- /dev/null
+++ b/embedded-tests/src/test/resources/manifests/druid-operator-rbac.yaml
@@ -0,0 +1,30 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+  name: druid-cluster
+rules:
+  - apiGroups:
+      - ""
+    resources:
+      - pods
+      - configmaps
+    verbs:
+      - '*'
+  - apiGroups: ["batch"]
+    resources: ["jobs"]
+    verbs: ["get", "watch", "list", "delete", "create"]
+  - apiGroups: [""]
+    resources: ["pods", "pods/log"]
+    verbs: ["get", "watch", "list", "delete", "create"]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: druid-cluster
+subjects:
+  - kind: ServiceAccount
+    name: default
+roleRef:
+  kind: Role
+  name: druid-cluster
+  apiGroup: rbac.authorization.k8s.io
diff --git 
a/embedded-tests/src/test/resources/manifests/druid-service-with-operator.yaml 
b/embedded-tests/src/test/resources/manifests/druid-service-with-operator.yaml
new file mode 100644
index 00000000000..c60b7d1f1f7
--- /dev/null
+++ 
b/embedded-tests/src/test/resources/manifests/druid-service-with-operator.yaml
@@ -0,0 +1,114 @@
+apiVersion: "druid.apache.org/v1alpha1"
+kind: "Druid"
+metadata:
+  name: test-cluster-${service}
+spec:
+  image: ${image}
+  startScript: /druid.sh
+  scalePvcSts: true
+  rollingDeploy: true
+  defaultProbes: false
+  podLabels:
+    environment: stage
+    release: alpha
+  podAnnotations:
+    dummy: k8s_extn_needs_atleast_one_annotation
+  volumes:
+    - name: mysqlconnector
+      emptyDir: { }
+  securityContext:
+    fsGroup: 0
+    runAsUser: 0
+    runAsGroup: 0
+  containerSecurityContext:
+    privileged: true
+  commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
+  common.runtime.properties: |
+${commonRuntimeProperties}
+  jvm.options: |-
+    -server
+    -Djava.net.preferIPv4Stack=true
+    -XX:MaxDirectMemorySize=10240g
+    -Duser.timezone=UTC
+    -Dfile.encoding=UTF-8
+    -Dlog4j.debug
+    -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
+  log4j.config: |-
+    <?xml version="1.0" encoding="UTF-8" ?>
+    <Configuration status="WARN">
+        <Appenders>
+            <Console name="Console" target="SYSTEM_OUT">
+                <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level 
%logger{36} - %msg%n"/>
+            </Console>
+            <File name="FileAppender" 
fileName="log/${sys:druid.node.type}.log">
+                <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] 
%-5level %logger{36} - %msg%n"/>
+            </File>
+        </Appenders>
+        <Loggers>
+            <Root level="info">
+                <AppenderRef ref="Console"/>
+                <AppenderRef ref="FileAppender"/>
+            </Root>
+        </Loggers>
+    </Configuration>
+  env:
+    - name: POD_NAME
+      valueFrom:
+        fieldRef:
+          fieldPath: metadata.name
+    - name: POD_NAMESPACE
+      valueFrom:
+        fieldRef:
+          fieldPath: metadata.namespace
+  nodes:
+    ${service}s:
+      nodeType: ${service}
+      priorityClassName: system-cluster-critical
+      druid.port: ${port}
+      services:
+        - spec:
+            type: NodePort
+            ports:
+              - name: http
+                port: ${port}
+                targetPort: ${port}
+                nodePort: ${port}
+      replicas: 1
+      nodeConfigMountPath: "/opt/druid/conf/druid/cluster/${serviceFolder}"
+      runtime.properties: |
+${nodeRuntimeProperties}
+      livenessProbe:
+        failureThreshold: 10
+        httpGet:
+          path: /status/health
+          port: ${port}
+        initialDelaySeconds: 5
+        periodSeconds: 10
+        successThreshold: 1
+        timeoutSeconds: 5
+      readinessProbe:
+        failureThreshold: 20
+        httpGet:
+          path: /status/health
+          port: ${port}
+        initialDelaySeconds: 5
+        periodSeconds: 10
+        successThreshold: 1
+        timeoutSeconds: 5
+      startUpProbe:
+        failureThreshold: 20
+        httpGet:
+          path: /status/health
+          port: ${port}
+        initialDelaySeconds: 60
+        periodSeconds: 30
+        successThreshold: 1
+        timeoutSeconds: 10
+      volumeMounts:
+        - mountPath: /druid/data
+          name: druid-shared-storage
+      volumes:
+        - name: druid-shared-storage
+          hostPath:
+            path: /druid/shared-storage
+            type: DirectoryOrCreate
diff --git a/extensions-core/kubernetes-overlord-extensions/pom.xml 
b/extensions-core/kubernetes-overlord-extensions/pom.xml
index d6579fbd752..f64125bc1fc 100644
--- a/extensions-core/kubernetes-overlord-extensions/pom.xml
+++ b/extensions-core/kubernetes-overlord-extensions/pom.xml
@@ -270,7 +270,6 @@
       <artifactId>equalsverifier</artifactId>
       <scope>test</scope>
     </dependency>
-
   </dependencies>
 
   <build>
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index aa205900291..395b662a4f0 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -204,7 +204,7 @@ public abstract class AbstractTask implements Task
     // isEncapsulatedTask() currently means "isK8sIngestion".
     // We don't need to push reports and status here for other ingestion 
methods.
     if (!toolbox.getConfig().isEncapsulatedTask()) {
-      log.debug("Not pushing task logs and reports from task.");
+      log.info("Not pushing task logs and reports from task.");
       return;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to