This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new aa7a65676b8 Add embedded test for kubernetes task runner using Druid
operator (#18413)
aa7a65676b8 is described below
commit aa7a65676b890116f02d703bf46f20c77b084ba2
Author: Uddeshya Singh <[email protected]>
AuthorDate: Sun Aug 24 23:21:54 2025 +0530
Add embedded test for kubernetes task runner using Druid operator (#18413)
Changes:
- Add `K3sClusterResourceWithOperator`
- Add `KubernetesClusterWithOperatorDockerTest`
- Add operator specific manifest template
---
embedded-tests/pom.xml | 16 ++
.../testing/embedded/k8s/K3sClusterResource.java | 96 ++++++--
.../k8s/K3sClusterWithOperatorResource.java | 271 +++++++++++++++++++++
.../testing/embedded/k8s/K3sDruidService.java | 28 ++-
.../embedded/k8s/KubernetesClusterDockerTest.java | 2 +-
...> KubernetesClusterWithOperatorDockerTest.java} | 35 ++-
.../manifests/druid-operator-namespace.yaml | 4 +
.../resources/manifests/druid-operator-rbac.yaml | 30 +++
.../manifests/druid-service-with-operator.yaml | 114 +++++++++
.../kubernetes-overlord-extensions/pom.xml | 1 -
.../druid/indexing/common/task/AbstractTask.java | 2 +-
11 files changed, 553 insertions(+), 46 deletions(-)
diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index a76849f02ef..f464b3a9ff2 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -142,6 +142,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
@@ -399,6 +403,18 @@
<version>6.2.12</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.kubernetes</groupId>
+ <artifactId>client-java-api</artifactId>
+ <version>19.0.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
index 8a28398b751..13c9201976b 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterResource.java
@@ -73,6 +73,7 @@ public class K3sClusterResource extends
TestcontainerResource<K3sContainer>
public static final String DRUID_NAMESPACE = "druid";
private static final String NAMESPACE_MANIFEST =
"manifests/druid-namespace.yaml";
+ private static final String DEFAULT_SERVICE_MANIFEST =
"manifests/druid-service.yaml";
private static final String COMMON_CONFIG_MAP = "druid-common-props";
private static final String SERVICE_CONFIG_MAP = "druid-%s-props";
@@ -84,13 +85,20 @@ public class K3sClusterResource extends
TestcontainerResource<K3sContainer>
private KubernetesClient client;
private String druidImageName;
+ private String druidManifestTemplate = DEFAULT_SERVICE_MANIFEST;
private final Closer closer = Closer.create();
public K3sClusterResource()
{
// Add the namespace manifest
- manifestFiles.add(Resources.getFileForResource(NAMESPACE_MANIFEST));
+ addManifestResource(NAMESPACE_MANIFEST);
+ }
+
+ public K3sClusterResource addManifestResource(String manifestResourceName)
+ {
+ manifestFiles.add(Resources.getFileForResource(manifestResourceName));
+ return this;
}
public K3sClusterResource addService(K3sDruidService service)
@@ -109,11 +117,20 @@ public class K3sClusterResource extends
TestcontainerResource<K3sContainer>
* Uses the Docker test image specified by the system property
* {@link DruidContainerResource#PROPERTY_TEST_IMAGE} for the Druid pods.
*/
- public K3sClusterResource usingTestImage()
+ public K3sClusterResource usingDruidTestImage()
{
return usingDruidImage(DruidContainerResource.getTestDruidImageName());
}
+ /**
+ * Uses the given resource template to create manifests for Druid services.
+ */
+ public K3sClusterResource usingDruidManifestTemplate(String resourceName)
+ {
+ this.druidManifestTemplate = resourceName;
+ return this;
+ }
+
@Override
protected K3sContainer createContainer()
{
@@ -129,6 +146,9 @@ public class K3sClusterResource extends
TestcontainerResource<K3sContainer>
// Druid service is discoverable with the Druid service discovery
portBindings.add(port + ":" + port);
}
+ int servicePort = service.getServicePort();
+ container.addExposedPorts(servicePort);
+ portBindings.add(servicePort + ":" + servicePort);
}
container.setPortBindings(portBindings);
@@ -138,37 +158,59 @@ public class K3sClusterResource extends
TestcontainerResource<K3sContainer>
@Override
public void onStarted(EmbeddedDruidCluster cluster)
{
- client = new KubernetesClientBuilder()
- .withConfig(Config.fromKubeconfig(getContainer().getKubeConfigYaml()))
- .build();
- closer.register(client);
-
+ initKubernetesClient();
loadLocalDockerImageIntoContainer(druidImageName, cluster.getTestFolder());
-
manifestFiles.forEach(this::applyManifest);
+ initializeDruidServices(cluster);
+ waitUntilPodsAreReady(DRUID_NAMESPACE);
+ waitUntilServicesAreHealthy();
+ }
- // Create common config map
- final Properties commonProperties = new Properties();
- commonProperties.putAll(cluster.getCommonProperties());
- commonProperties.remove("druid.extensions.modulesForEmbeddedTests");
- applyConfigMap(
- newConfigMap(COMMON_CONFIG_MAP, commonProperties,
"common.runtime.properties")
- );
-
- // Create config maps and manifests for each service
+ protected void initializeDruidServices(EmbeddedDruidCluster cluster)
+ {
+ createConfigMapForCommonProperties(cluster);
for (K3sDruidService druidService : services) {
final String serviceConfigMap = StringUtils.format(SERVICE_CONFIG_MAP,
druidService.getName());
applyConfigMap(
newConfigMap(serviceConfigMap, druidService.getProperties(),
"runtime.properties")
);
- applyManifest(druidService);
+ applyManifestYaml(druidService, createManifestYaml(druidService));
}
+ }
- // Wait for all pods to be ready and services to be healthy
-
client.pods().inNamespace(DRUID_NAMESPACE).resources().forEach(this::waitUntilPodIsReady);
+ protected void waitUntilServicesAreHealthy()
+ {
services.forEach(this::waitUntilServiceIsHealthy);
}
+ protected List<K3sDruidService> getServices()
+ {
+ return services;
+ }
+
+ private void createConfigMapForCommonProperties(EmbeddedDruidCluster cluster)
+ {
+ final Properties commonProperties = new Properties();
+ commonProperties.putAll(cluster.getCommonProperties());
+ commonProperties.remove("druid.extensions.modulesForEmbeddedTests");
+ applyConfigMap(
+ newConfigMap(COMMON_CONFIG_MAP, commonProperties,
"common.runtime.properties")
+ );
+ }
+
+ private void initKubernetesClient()
+ {
+ client = new KubernetesClientBuilder()
+ .withConfig(Config.fromKubeconfig(getContainer().getKubeConfigYaml()))
+ .build();
+ closer.register(client);
+ }
+
+ protected void waitUntilPodsAreReady(String namespace)
+ {
+
client.pods().inNamespace(namespace).resources().forEach(this::waitUntilPodIsReady);
+ }
+
@Override
public void stop()
{
@@ -251,13 +293,19 @@ public class K3sClusterResource extends
TestcontainerResource<K3sContainer>
}
/**
- * Creates and applies the manifest for the given Druid service.
+ * Creates the manifest YAML for the given Druid service.
*/
- private void applyManifest(K3sDruidService service)
+ protected String createManifestYaml(K3sDruidService service)
{
- final String manifestYaml = service.createManifestYaml(druidImageName);
- log.info("Applying manifest for service[%s]: %s", service.getName(),
manifestYaml);
+ return service.createManifestYaml(druidManifestTemplate, druidImageName);
+ }
+ /**
+ * Applies the given service manifest YAML to the K3s cluster.
+ */
+ protected void applyManifestYaml(K3sDruidService service, String
manifestYaml)
+ {
+ log.info("Applying manifest for service[%s]: %s", service.getName(),
manifestYaml);
try (ByteArrayInputStream bis = new
ByteArrayInputStream(manifestYaml.getBytes(StandardCharsets.UTF_8))) {
client.load(bis).inNamespace(DRUID_NAMESPACE).serverSideApply();
log.info("Applied manifest for service[%s]", service.getName());
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterWithOperatorResource.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterWithOperatorResource.java
new file mode 100644
index 00000000000..20af2365061
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sClusterWithOperatorResource.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.k8s;
+
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.nio.file.Files;
+import java.nio.file.attribute.PosixFilePermission;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.Set;
+import java.util.zip.GZIPInputStream;
+
+public class K3sClusterWithOperatorResource extends K3sClusterResource
+{
+ private static final Logger log = new
Logger(K3sClusterWithOperatorResource.class);
+ private static final String RBAC_MANIFEST =
"manifests/druid-operator-rbac.yaml";
+ private static final String OPERATOR_NAMESPACE_MANIFEST =
"manifests/druid-operator-namespace.yaml";
+ private static final String OPERATOR_NAMESPACE = "druid-operator-system";
+ private static final String HELM_RELEASE_NAME = "druid-operator";
+ private static final String HELM_REPO_NAME = "datainfra";
+ private static final String HELM_REPO_URL = "https://charts.datainfra.io";
+ private static final String HELM_CHART_NAME = "datainfra/druid-operator";
+ private static final String HELM_VERSION = "v3.13.1";
+ private static final String HELM_PLATFORM = "linux-amd64";
+ private static final String HELM_MOUNT_PATH = "/usr/local/bin/helm";
+
+
+ public K3sClusterWithOperatorResource()
+ {
+ super();
+ addManifestResource(RBAC_MANIFEST);
+ addManifestResource(OPERATOR_NAMESPACE_MANIFEST);
+ }
+
+ @Override
+ protected void initializeDruidServices(EmbeddedDruidCluster cluster)
+ {
+ installHelm(cluster);
+ setupOperatorWithHelm();
+ waitUntilPodsAreReady(OPERATOR_NAMESPACE);
+ for (K3sDruidService druidService : getServices()) {
+ applyManifestYaml(druidService, createManifestYaml(druidService,
cluster));
+ }
+ }
+
+ /**
+ * Installs Helm binary in the K3s cluster.
+ */
+ private void installHelm(EmbeddedDruidCluster cluster)
+ {
+ try {
+ File helmBinary = downloadHelmBinary(cluster);
+ this.getContainer().copyFileToContainer(
+ MountableFile.forHostPath(helmBinary.getAbsolutePath()),
+ HELM_MOUNT_PATH
+ );
+ this.getContainer().execInContainer("chmod", "+x", HELM_MOUNT_PATH);
+ log.info("Helm binary installed to /usr/local/bin/helm");
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to download or install Helm binary");
+ throw new RuntimeException("Helm installation failed", e);
+ }
+ }
+
+ private File downloadHelmBinary(EmbeddedDruidCluster cluster) throws
Exception
+ {
+ String helmUrl = StringUtils.format(
+ "https://get.helm.sh/helm-%s-%s.tar.gz",
+ HELM_VERSION,
+ HELM_PLATFORM
+ );
+ log.info("Downloading Helm from URL[%s].", helmUrl);
+
+ File helmFolder = cluster.getTestFolder().getOrCreateFolder("helm");
+ File tarFile = new File(helmFolder, "helm.tar.gz");
+ File helmBinary = new File(helmFolder, "helm");
+
+ if (helmBinary.exists() && helmBinary.canExecute()) {
+ log.info("Helm binary already exists at path[%s]",
helmBinary.getAbsolutePath());
+ return helmBinary;
+ }
+
+ HttpClient client = HttpClient.newBuilder()
+ .connectTimeout(Duration.ofSeconds(30))
+ .build();
+
+ HttpRequest request = HttpRequest.newBuilder()
+ .uri(URI.create(helmUrl))
+ .timeout(Duration.ofSeconds(120))
+ .build();
+
+ HttpResponse<InputStream> response = client.send(request,
HttpResponse.BodyHandlers.ofInputStream());
+
+ if (response.statusCode() != 200) {
+ throw new RuntimeException("Failed to download Helm. Status: " +
response.statusCode());
+ }
+
+ try (InputStream inputStream = response.body();
+ FileOutputStream outputStream = new FileOutputStream(tarFile)) {
+ inputStream.transferTo(outputStream);
+ }
+
+ extractTarGz(tarFile, helmFolder, HELM_PLATFORM + "/helm", "helm");
+
+ Set<PosixFilePermission> permissions = Set.of(
+ PosixFilePermission.OWNER_READ,
+ PosixFilePermission.OWNER_WRITE,
+ PosixFilePermission.OWNER_EXECUTE,
+ PosixFilePermission.GROUP_READ,
+ PosixFilePermission.GROUP_EXECUTE,
+ PosixFilePermission.OTHERS_READ,
+ PosixFilePermission.OTHERS_EXECUTE
+ );
+
+ try {
+ Files.setPosixFilePermissions(helmBinary.toPath(), permissions);
+ }
+ catch (IOException e) {
+ helmBinary.setExecutable(true);
+ }
+
+ tarFile.delete();
+ log.info("Helm binary downloaded and extracted to: %s",
helmBinary.getAbsolutePath());
+ return helmBinary;
+ }
+
+ /**
+ * Extract a specific file from a tar.gz archive.
+ */
+ private void extractTarGz(File tarGzFile, File destFolder, String
sourceEntryPath, String destFileName)
+ throws IOException
+ {
+ try (FileInputStream fis = new FileInputStream(tarGzFile);
+ BufferedInputStream bis = new BufferedInputStream(fis);
+ GZIPInputStream gis = new GZIPInputStream(bis);
+ TarArchiveInputStream tais = new TarArchiveInputStream(gis)) {
+
+ TarArchiveEntry entry;
+ while ((entry = tais.getNextTarEntry()) != null) {
+ if (entry.getName().equals(sourceEntryPath)) {
+ File destFile = new File(destFolder, destFileName);
+ try (FileOutputStream fos = new FileOutputStream(destFile)) {
+ byte[] buffer = new byte[8192];
+ int len;
+ while ((len = tais.read(buffer)) != -1) {
+ fos.write(buffer, 0, len);
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Updates helm repository and installs the druid operator chart in the K3s
cluster.
+ */
+ private void setupOperatorWithHelm()
+ {
+ try {
+ executeHelmCommand("repo", "add", HELM_REPO_NAME, HELM_REPO_URL);
+ executeHelmCommand("repo", "update");
+ executeHelmCommand(
+ "install",
+ HELM_RELEASE_NAME,
+ HELM_CHART_NAME,
+ "--namespace", OPERATOR_NAMESPACE,
+ "--create-namespace",
+ "--set", "env.WATCH_NAMESPACE=" + DRUID_NAMESPACE,
+ "--wait",
+ "--timeout", "3m"
+ );
+ }
+ catch (Exception e) {
+ log.error("Failed to set up Druid Operator with Helm: %s",
e.getMessage());
+ throw new RuntimeException("Failed to execute helm command", e);
+ }
+ }
+
+ /**
+ * Executes a Helm command in the K3s cluster container.
+ */
+ private void executeHelmCommand(String... args) throws Exception
+ {
+ String[] fullCommand = new String[args.length + 3];
+ fullCommand[0] = "sh";
+ fullCommand[1] = "-c";
+ fullCommand[2] = "export KUBECONFIG=/etc/rancher/k3s/k3s.yaml && helm " +
String.join(" ", args);
+
+ try {
+ log.info("Executing command[%s]",
org.apache.commons.lang3.StringUtils.join(fullCommand, " "));
+ Container.ExecResult result =
getContainer().execInContainer(fullCommand);
+
+ if (result.getExitCode() != 0) {
+ log.error("Helm command failed with exit code: %d",
result.getExitCode());
+ if (!result.getStderr().trim().isEmpty()) {
+ log.error("Error: %s", result.getStderr().trim());
+ }
+ throw new RuntimeException("Helm command failed: " + String.join(" ",
args));
+ }
+ }
+ catch (Exception e) {
+ log.error("Exception executing helm command: %s", e.getMessage());
+ throw e;
+ }
+ }
+
+ private String createManifestYaml(K3sDruidService service,
EmbeddedDruidCluster cluster)
+ {
+ String manifestYaml = super.createManifestYaml(service);
+ manifestYaml = StringUtils.replace(
+ manifestYaml,
+ "${commonRuntimeProperties}",
+ buildPropertiesString(cluster.getCommonProperties(), 4)
+ );
+ manifestYaml = StringUtils.replace(
+ manifestYaml,
+ "${nodeRuntimeProperties}",
+ buildPropertiesString(service.getRuntimeProperties(), 8)
+ );
+ return manifestYaml;
+ }
+
+ /**
+ * Builds a properties string to be used in the manifest.yaml file
supporting a uniform indentation.
+ */
+ private static String buildPropertiesString(Properties properties, int
indentationSpaces)
+ {
+ final StringBuilder builder = new StringBuilder();
+ String indentation = " ".repeat(indentationSpaces);
+ for (String key : properties.stringPropertyNames()) {
+
builder.append(indentation).append(key).append("=").append(properties.getProperty(key)).append("\n");
+ }
+ return builder.toString();
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
index 774beb69a27..9a78d9accca 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/K3sDruidService.java
@@ -36,20 +36,36 @@ import java.util.Properties;
*/
public class K3sDruidService
{
- private static final String MANIFEST_TEMPLATE =
"manifests/druid-service.yaml";
-
private final DruidCommand command;
private final Properties properties;
+ private int servicePort;
public K3sDruidService(DruidCommand command)
{
this.command = command;
this.properties = new Properties();
+ this.servicePort = command.getExposedPorts()[0];
addProperty("druid.host", EmbeddedHostname.containerFriendly().toString());
command.getDefaultProperties().forEach(properties::setProperty);
}
+ public K3sDruidService usingPort(int port)
+ {
+ this.servicePort = port;
+ return this;
+ }
+
+ public int getServicePort()
+ {
+ return servicePort;
+ }
+
+ public Properties getRuntimeProperties()
+ {
+ return properties;
+ }
+
public String getName()
{
return command.getName().toLowerCase(Locale.ROOT);
@@ -63,16 +79,16 @@ public class K3sDruidService
/**
* Creates a manifest YAML String for this service.
*/
- public String createManifestYaml(String druidImage)
+ public String createManifestYaml(String manifestTemplateResource, String
druidImage)
{
try {
final String template = Files.readString(
- Resources.getFileForResource(MANIFEST_TEMPLATE).toPath()
+ Resources.getFileForResource(manifestTemplateResource).toPath()
);
String manifest = StringUtils.replace(template, "${service}", getName());
manifest = StringUtils.replace(manifest, "${command}",
command.getName());
- manifest = StringUtils.replace(manifest, "${port}",
String.valueOf(command.getExposedPorts()[0]));
+ manifest = StringUtils.replace(manifest, "${port}",
String.valueOf(servicePort));
manifest = StringUtils.replace(manifest, "${image}", druidImage);
manifest = StringUtils.replace(manifest, "${serviceFolder}",
getServicePropsFolder());
@@ -99,7 +115,7 @@ public class K3sDruidService
return StringUtils.format(
"http://%s:%s/status/health",
EmbeddedHostname.containerFriendly().toString(),
- command.getExposedPorts()[0]
+ String.valueOf(servicePort)
);
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
index e7f618a8c13..c44796ca0f2 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
@@ -40,7 +40,7 @@ public class KubernetesClusterDockerTest extends
IngestionSmokeTest implements L
// Create a K3s cluster with all the required services
final K3sClusterResource k3sCluster = new K3sClusterResource()
- .usingTestImage()
+ .usingDruidTestImage()
.addService(new K3sDruidService(DruidCommand.Server.COORDINATOR))
.addService(new K3sDruidService(DruidCommand.Server.OVERLORD))
.addService(new K3sDruidService(DruidCommand.Server.HISTORICAL))
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
similarity index 72%
copy from
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
copy to
embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
index e7f618a8c13..748dfd55041 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterDockerTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
@@ -28,40 +28,49 @@ import org.junit.jupiter.api.BeforeEach;
/**
* Runs some basic ingestion tests against latest image Druid containers
running
- * on a K3s cluster.
+ * on a K3s cluster with druid-operator.
*/
-public class KubernetesClusterDockerTest extends IngestionSmokeTest implements
LatestImageDockerTest
+public class KubernetesClusterWithOperatorDockerTest extends
IngestionSmokeTest implements LatestImageDockerTest
{
+ private static final String MANIFEST_TEMPLATE =
"manifests/druid-service-with-operator.yaml";
+
@Override
protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
{
final K3sDruidService brokerService = new
K3sDruidService(DruidCommand.Server.BROKER)
- .addProperty("druid.sql.planner.metadataRefreshPeriod", "PT1s");
+ .addProperty("druid.sql.planner.metadataRefreshPeriod", "PT1s")
+ .usingPort(30082);
+
+ final K3sDruidService overlordService = new
K3sDruidService(DruidCommand.Server.OVERLORD)
+ .addProperty("druid.indexer.runner.type", "k8s")
+ .addProperty("druid.indexer.runner.namespace", "druid")
+ .addProperty("druid.indexer.runner.capacity", "4")
+ .usingPort(30090);
- // Create a K3s cluster with all the required services
- final K3sClusterResource k3sCluster = new K3sClusterResource()
- .usingTestImage()
- .addService(new K3sDruidService(DruidCommand.Server.COORDINATOR))
- .addService(new K3sDruidService(DruidCommand.Server.OVERLORD))
- .addService(new K3sDruidService(DruidCommand.Server.HISTORICAL))
- .addService(new K3sDruidService(DruidCommand.Server.MIDDLE_MANAGER))
- .addService(new K3sDruidService(DruidCommand.Server.ROUTER))
+ final K3sClusterResource k3sCluster = new K3sClusterWithOperatorResource()
+ .usingDruidTestImage()
+ .usingDruidManifestTemplate(MANIFEST_TEMPLATE)
+ .addService(new
K3sDruidService(DruidCommand.Server.COORDINATOR).usingPort(30081))
+ .addService(overlordService)
+ .addService(new
K3sDruidService(DruidCommand.Server.HISTORICAL).usingPort(30083))
+ .addService(new
K3sDruidService(DruidCommand.Server.ROUTER).usingPort(30088))
.addService(brokerService);
// Add an EmbeddedOverlord and EmbeddedBroker to use their client and
mapper bindings.
overlord.addProperty("druid.plaintextPort", "7090");
broker.addProperty("druid.plaintextPort", "7082");
-
+
return cluster
.useContainerFriendlyHostname()
.addResource(k3sCluster)
.addServer(overlord)
.addServer(broker)
.addServer(eventCollector)
+ .addCommonProperty("druid.indexer.task.encapsulatedTask", "true")
.addCommonProperty(
"druid.extensions.loadList",
"[\"druid-s3-extensions\", \"druid-kafka-indexing-service\","
- + "\"druid-multi-stage-query\", \"postgresql-metadata-storage\"]"
+ + "\"druid-multi-stage-query\", \"postgresql-metadata-storage\",
\"druid-kubernetes-overlord-extensions\"]"
);
}
diff --git
a/embedded-tests/src/test/resources/manifests/druid-operator-namespace.yaml
b/embedded-tests/src/test/resources/manifests/druid-operator-namespace.yaml
new file mode 100644
index 00000000000..c81ce338670
--- /dev/null
+++ b/embedded-tests/src/test/resources/manifests/druid-operator-namespace.yaml
@@ -0,0 +1,4 @@
+apiVersion: v1
+kind: Namespace
+metadata:
+ name: druid-operator-system
diff --git
a/embedded-tests/src/test/resources/manifests/druid-operator-rbac.yaml
b/embedded-tests/src/test/resources/manifests/druid-operator-rbac.yaml
new file mode 100644
index 00000000000..fe0bf32a7fd
--- /dev/null
+++ b/embedded-tests/src/test/resources/manifests/druid-operator-rbac.yaml
@@ -0,0 +1,30 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: Role
+metadata:
+ name: druid-cluster
+rules:
+ - apiGroups:
+ - ""
+ resources:
+ - pods
+ - configmaps
+ verbs:
+ - '*'
+ - apiGroups: ["batch"]
+ resources: ["jobs"]
+ verbs: ["get", "watch", "list", "delete", "create"]
+ - apiGroups: [""]
+ resources: ["pods", "pods/log"]
+ verbs: ["get", "watch", "list", "delete", "create"]
+---
+kind: RoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: druid-cluster
+subjects:
+ - kind: ServiceAccount
+ name: default
+roleRef:
+ kind: Role
+ name: druid-cluster
+ apiGroup: rbac.authorization.k8s.io
diff --git
a/embedded-tests/src/test/resources/manifests/druid-service-with-operator.yaml
b/embedded-tests/src/test/resources/manifests/druid-service-with-operator.yaml
new file mode 100644
index 00000000000..c60b7d1f1f7
--- /dev/null
+++
b/embedded-tests/src/test/resources/manifests/druid-service-with-operator.yaml
@@ -0,0 +1,114 @@
+apiVersion: "druid.apache.org/v1alpha1"
+kind: "Druid"
+metadata:
+ name: test-cluster-${service}
+spec:
+ image: ${image}
+ startScript: /druid.sh
+ scalePvcSts: true
+ rollingDeploy: true
+ defaultProbes: false
+ podLabels:
+ environment: stage
+ release: alpha
+ podAnnotations:
+ dummy: k8s_extn_needs_atleast_one_annotation
+ volumes:
+ - name: mysqlconnector
+ emptyDir: { }
+ securityContext:
+ fsGroup: 0
+ runAsUser: 0
+ runAsGroup: 0
+ containerSecurityContext:
+ privileged: true
+ commonConfigMountPath: "/opt/druid/conf/druid/cluster/_common"
+ common.runtime.properties: |
+${commonRuntimeProperties}
+ jvm.options: |-
+ -server
+ -Djava.net.preferIPv4Stack=true
+ -XX:MaxDirectMemorySize=10240g
+ -Duser.timezone=UTC
+ -Dfile.encoding=UTF-8
+ -Dlog4j.debug
+ -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
+ log4j.config: |-
+ <?xml version="1.0" encoding="UTF-8" ?>
+ <Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level
%logger{36} - %msg%n"/>
+ </Console>
+ <File name="FileAppender"
fileName="log/${sys:druid.node.type}.log">
+ <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t]
%-5level %logger{36} - %msg%n"/>
+ </File>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ <AppenderRef ref="FileAppender"/>
+ </Root>
+ </Loggers>
+ </Configuration>
+ env:
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
+ - name: POD_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ nodes:
+ ${service}s:
+ nodeType: ${service}
+ priorityClassName: system-cluster-critical
+ druid.port: ${port}
+ services:
+ - spec:
+ type: NodePort
+ ports:
+ - name: http
+ port: ${port}
+ targetPort: ${port}
+ nodePort: ${port}
+ replicas: 1
+ nodeConfigMountPath: "/opt/druid/conf/druid/cluster/${serviceFolder}"
+ runtime.properties: |
+${nodeRuntimeProperties}
+ livenessProbe:
+ failureThreshold: 10
+ httpGet:
+ path: /status/health
+ port: ${port}
+ initialDelaySeconds: 5
+ periodSeconds: 10
+ successThreshold: 1
+ timeoutSeconds: 5
+ readinessProbe:
+ failureThreshold: 20
+ httpGet:
+ path: /status/health
+ port: ${port}
+ initialDelaySeconds: 5
+ periodSeconds: 10
+ successThreshold: 1
+ timeoutSeconds: 5
+ startUpProbe:
+ failureThreshold: 20
+ httpGet:
+ path: /status/health
+ port: ${port}
+ initialDelaySeconds: 60
+ periodSeconds: 30
+ successThreshold: 1
+ timeoutSeconds: 10
+ volumeMounts:
+ - mountPath: /druid/data
+ name: druid-shared-storage
+ volumes:
+ - name: druid-shared-storage
+ hostPath:
+ path: /druid/shared-storage
+ type: DirectoryOrCreate
diff --git a/extensions-core/kubernetes-overlord-extensions/pom.xml
b/extensions-core/kubernetes-overlord-extensions/pom.xml
index d6579fbd752..f64125bc1fc 100644
--- a/extensions-core/kubernetes-overlord-extensions/pom.xml
+++ b/extensions-core/kubernetes-overlord-extensions/pom.xml
@@ -270,7 +270,6 @@
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
-
</dependencies>
<build>
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index aa205900291..395b662a4f0 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -204,7 +204,7 @@ public abstract class AbstractTask implements Task
// isEncapsulatedTask() currently means "isK8sIngestion".
// We don't need to push reports and status here for other ingestion
methods.
if (!toolbox.getConfig().isEncapsulatedTask()) {
- log.debug("Not pushing task logs and reports from task.");
+ log.info("Not pushing task logs and reports from task.");
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]