This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3413057f3ea some enhancements to embedded tests for writing query
oriented tests (#18228)
3413057f3ea is described below
commit 3413057f3eaff4f135d4452d1def588e6569368f
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Jul 10 18:45:13 2025 -0700
some enhancements to embedded tests for writing query oriented tests
(#18228)
---
.../msq/EmbeddedDurableShuffleStorageTest.java | 58 +++++------
.../embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 52 +++++-----
.../testing/embedded/EmbeddedDruidCluster.java | 21 +++-
.../testing/embedded/EmbeddedDruidServer.java | 108 +++++++++++++--------
.../druid/testing/embedded/EmbeddedHistorical.java | 14 +++
.../druid/testing/embedded/EmbeddedResource.java | 12 +++
.../testing/embedded/EmbeddedServerLifecycle.java | 13 +--
7 files changed, 168 insertions(+), 110 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
index fd2b9bc535b..e3f07a1afe2 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDurableShuffleStorageTest.java
@@ -55,7 +55,7 @@ import
org.apache.druid.testing.embedded.minio.MinIOStorageResource;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.testcontainers.shaded.com.google.common.io.ByteStreams;
@@ -101,17 +101,19 @@ public class EmbeddedDurableShuffleStorageTest extends
EmbeddedClusterTestBase
return EmbeddedDruidCluster
.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
- .addExtension(DartControllerModule.class)
- .addExtension(DartWorkerModule.class)
- .addExtension(DartControllerMemoryManagementModule.class)
- .addExtension(DartWorkerMemoryManagementModule.class)
- .addExtension(IndexerMemoryManagementModule.class)
- .addExtension(MSQDurableStorageModule.class)
- .addExtension(MSQIndexingModule.class)
- .addExtension(MSQSqlModule.class)
- .addExtension(SqlTaskModule.class)
- .addExtension(MSQExternalDataSourceModule.class)
- .addExtension(S3StorageConnectorModule.class)
+ .addExtensions(
+ DartControllerModule.class,
+ DartWorkerModule.class,
+ DartControllerMemoryManagementModule.class,
+ DartWorkerMemoryManagementModule.class,
+ IndexerMemoryManagementModule.class,
+ MSQDurableStorageModule.class,
+ MSQIndexingModule.class,
+ MSQSqlModule.class,
+ SqlTaskModule.class,
+ MSQExternalDataSourceModule.class,
+ S3StorageConnectorModule.class
+ )
.addResource(storageResource)
.addResource(msqStorageResource)
.addServer(coordinator)
@@ -122,19 +124,25 @@ public class EmbeddedDurableShuffleStorageTest extends
EmbeddedClusterTestBase
.addServer(router);
}
- @BeforeEach
- void setUpEach()
+ @BeforeAll
+ final void setupData() throws IOException
{
msqApis = new EmbeddedMSQApis(cluster, overlord);
dataSource = EmbeddedClusterApis.createTestDatasourceName();
+ loadWikipediaTable();
+ }
+
+ @Override
+ protected void beforeEachTest()
+ {
+ // do nothing here, the super version of this method generates a new value
for dataSource field each time, but
+ // we are setting that in our @BeforeAll where we are just inserting data
once and re-using between runs
}
@Test
@Timeout(120)
- public void test_selectFirstPage() throws IOException
+ public void test_selectFirstPage()
{
- loadWikipediaTable();
-
final String sql = StringUtils.format(
"SET durableShuffleStorage = TRUE;\n"
+ "SET targetPartitionsPerWorker = 3;\n"
@@ -158,10 +166,8 @@ public class EmbeddedDurableShuffleStorageTest extends
EmbeddedClusterTestBase
@Test
@Timeout(120)
- public void test_selectCount() throws IOException
+ public void test_selectCount()
{
- loadWikipediaTable();
-
final String sql = StringUtils.format(
"SET durableShuffleStorage = TRUE;\n"
+ "SET targetPartitionsPerWorker = 3;\n"
@@ -182,10 +188,8 @@ public class EmbeddedDurableShuffleStorageTest extends
EmbeddedClusterTestBase
@Test
@Timeout(120)
- public void test_selectJoin_broadcast() throws IOException
+ public void test_selectJoin_broadcast()
{
- loadWikipediaTable();
-
final String sql = StringUtils.format(
"SET durableShuffleStorage = TRUE;\n"
+ "SET sqlJoinAlgorithm = 'broadcast';\n"
@@ -218,10 +222,8 @@ public class EmbeddedDurableShuffleStorageTest extends
EmbeddedClusterTestBase
@Test
@Timeout(120)
- public void test_selectJoin_sortMerge() throws IOException
+ public void test_selectJoin_sortMerge()
{
- loadWikipediaTable();
-
final String sql = StringUtils.format(
"SET durableShuffleStorage = TRUE;\n"
+ "SET sqlJoinAlgorithm = 'sortMerge';\n"
@@ -254,10 +256,8 @@ public class EmbeddedDurableShuffleStorageTest extends
EmbeddedClusterTestBase
@Test
@Timeout(120)
- public void test_selectJoin_sortMerge_durableDestination() throws IOException
+ public void test_selectJoin_sortMerge_durableDestination()
{
- loadWikipediaTable();
-
final String sql = StringUtils.format(
"SET durableShuffleStorage = TRUE;\n"
+ "SET sqlJoinAlgorithm = 'sortMerge';\n"
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
index c2e770b4490..8ac2d13c237 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
@@ -99,8 +99,6 @@ public class EmbeddedMSQRealtimeQueryTest extends
EmbeddedClusterTestBase
@Override
public EmbeddedDruidCluster createCluster()
{
- final EmbeddedDruidCluster cluster =
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
-
kafka = new KafkaResource();
coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
@@ -121,30 +119,32 @@ public class EmbeddedMSQRealtimeQueryTest extends
EmbeddedClusterTestBase
.addProperty("druid.processing.numThreads", "3")
.addProperty("druid.worker.capacity", "4");
- cluster.addExtension(KafkaIndexTaskModule.class)
- .addExtension(DartControllerModule.class)
- .addExtension(DartWorkerModule.class)
- .addExtension(DartControllerMemoryManagementModule.class)
- .addExtension(DartControllerModule.class)
- .addExtension(DartWorkerMemoryManagementModule.class)
- .addExtension(DartWorkerModule.class)
- .addExtension(IndexerMemoryManagementModule.class)
- .addExtension(MSQDurableStorageModule.class)
- .addExtension(MSQIndexingModule.class)
- .addExtension(MSQSqlModule.class)
- .addExtension(SqlTaskModule.class)
- .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
- .addCommonProperty("druid.msq.dart.enabled", "true")
- .useLatchableEmitter()
- .addResource(kafka)
- .addServer(coordinator)
- .addServer(overlord)
- .addServer(indexer)
- .addServer(broker)
- .addServer(historical)
- .addServer(router);
-
- return cluster;
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .addExtensions(
+ KafkaIndexTaskModule.class,
+ DartControllerModule.class,
+ DartWorkerModule.class,
+ DartControllerMemoryManagementModule.class,
+ DartControllerModule.class,
+ DartWorkerMemoryManagementModule.class,
+ DartWorkerModule.class,
+ IndexerMemoryManagementModule.class,
+ MSQDurableStorageModule.class,
+ MSQIndexingModule.class,
+ MSQSqlModule.class,
+ SqlTaskModule.class
+ )
+ .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
+ .addCommonProperty("druid.msq.dart.enabled", "true")
+ .useLatchableEmitter()
+ .addResource(kafka)
+ .addServer(coordinator)
+ .addServer(overlord)
+ .addServer(indexer)
+ .addServer(broker)
+ .addServer(historical)
+ .addServer(router);
}
@BeforeEach
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 6c3cfccbccb..eb5b1611875 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -157,7 +157,7 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
/**
* Adds an extension to this cluster. The list of extensions is populated in
- * the common property {@code druid.extensions.modulesForSimulation}.
+ * the common property {@code druid.extensions.modulesForEmbeddedTest}.
*/
public EmbeddedDruidCluster addExtension(Class<? extends DruidModule>
moduleClass)
{
@@ -166,6 +166,19 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
return this;
}
+ /**
+ * Adds extensions to this cluster.
+ *
+ * @see #addExtension(Class)
+ */
+ @SafeVarargs
+ public final EmbeddedDruidCluster addExtensions(Class<? extends
DruidModule>... moduleClasses)
+ {
+ validateNotStarted();
+ extensionModules.addAll(List.of(moduleClasses));
+ return this;
+ }
+
/**
* Adds a Druid server to this cluster. A server added to the cluster after
the
* cluster has started must be started explicitly by calling
@@ -173,9 +186,12 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
*/
public EmbeddedDruidCluster addServer(EmbeddedDruidServer server)
{
- server.onAddedToCluster(this, commonProperties);
+ server.onAddedToCluster(commonProperties);
servers.add(server);
resources.add(server);
+ if (startedFirstDruidServer) {
+ server.beforeStart(this);
+ }
return this;
}
@@ -247,6 +263,7 @@ public class EmbeddedDruidCluster implements
ClusterReferencesProvider, Embedded
}
log.info("Starting resource[%s].", resource);
+ resource.beforeStart(this);
resource.start();
resource.onStarted(this);
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
index c33247df2bf..cdc7247b17f 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidServer.java
@@ -22,6 +22,7 @@ package org.apache.druid.testing.embedded;
import com.google.inject.Binder;
import com.google.inject.Injector;
import org.apache.druid.cli.ServerRunnable;
+import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@@ -29,7 +30,9 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.utils.RuntimeInfo;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
@@ -43,7 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
public abstract class EmbeddedDruidServer<T extends EmbeddedDruidServer<T>>
implements EmbeddedResource
{
private static final Logger log = new Logger(EmbeddedDruidServer.class);
- protected static final long MEM_100_MB = 100_000_000;
+ protected static final long MEM_100_MB = HumanReadableBytes.parse("100M");
/**
* A static incremental ID is used instead of a random number to ensure that
@@ -57,6 +60,7 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
private long serverMemory = MEM_100_MB;
private long serverDirectMemory = MEM_100_MB;
private final Map<String, String> serverProperties = new HashMap<>();
+ private final List<BeforeStart> beforeStartHooks = new ArrayList<>();
private final ServerReferenceHolder referenceHolder = new
ServerReferenceHolder();
EmbeddedDruidServer()
@@ -66,6 +70,34 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
this.getClass().getSimpleName(),
SERVER_ID.incrementAndGet()
);
+ beforeStartHooks.add(
+ (cluster, self) -> {
+ // Add properties for temporary directories used by the servers
+ final String logsDirectory =
cluster.getTestFolder().getOrCreateFolder("indexer-logs").getAbsolutePath();
+ final String taskDirectory =
cluster.getTestFolder().newFolder().getAbsolutePath();
+ final String storageDirectory =
cluster.getTestFolder().newFolder().getAbsolutePath();
+ log.info(
+ "Server[%s] using directories: task directory[%s], logs
directory[%s], storage directory[%s].",
+ self.getName(),
+ taskDirectory,
+ logsDirectory,
+ storageDirectory
+ );
+ self.addProperty("druid.host", "localhost");
+ self.addProperty("druid.indexer.task.baseDir", taskDirectory);
+ self.addProperty("druid.indexer.logs.directory", logsDirectory);
+ self.addProperty("druid.storage.storageDirectory", storageDirectory);
+
+ // Add properties for Zookeeper
+ if (cluster.getZookeeper() != null) {
+ self.addProperty("druid.zk.service.host",
cluster.getZookeeper().getConnectString());
+ }
+
+ // Add properties for RuntimeInfoModule
+ self.addProperty(RuntimeInfoModule.SERVER_MEMORY_PROPERTY,
String.valueOf(serverMemory));
+ self.addProperty(RuntimeInfoModule.SERVER_DIRECT_MEMORY_PROPERTY,
String.valueOf(serverDirectMemory));
+ }
+ );
}
@Override
@@ -90,6 +122,14 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
}
}
+ @Override
+ public void beforeStart(EmbeddedDruidCluster cluster)
+ {
+ for (BeforeStart hook : beforeStartHooks) {
+ hook.run(cluster, this);
+ }
+ }
+
/**
* @return Name of this server = type + 2-digit ID.
*/
@@ -110,6 +150,16 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
return (T) this;
}
+ /**
+ * Adds a {@link BeforeStart} to run as part of {@link
#beforeStart(EmbeddedDruidCluster)}
+ */
+ @SuppressWarnings("UnusedReturnValue")
+ public final T addBeforeStartHook(BeforeStart hook)
+ {
+ beforeStartHooks.add(hook);
+ return (T) this;
+ }
+
/**
* Sets the amount of heap memory visible to the server through {@link
RuntimeInfo}.
*/
@@ -132,11 +182,9 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
* Called from {@link EmbeddedDruidCluster#addServer(EmbeddedDruidServer)} to
* tie the lifecycle of this server to the cluster.
*/
- final void onAddedToCluster(EmbeddedDruidCluster cluster, Properties
commonProperties)
+ final void onAddedToCluster(Properties commonProperties)
{
- this.lifecycle.set(
- new EmbeddedServerLifecycle(this, cluster.getTestFolder(),
cluster.getZookeeper(), commonProperties)
- );
+ this.lifecycle.set(new EmbeddedServerLifecycle(this, commonProperties));
}
/**
@@ -160,45 +208,9 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
* this server. This must be called only after all the resources required by
* the Druid server have been initialized.
*/
- final Properties getStartupProperties(
- TestFolder testFolder,
- EmbeddedZookeeper zookeeper
- )
+ final Properties getStartupProperties()
{
final Properties serverProperties = new Properties();
-
- // Add properties for temporary directories used by the servers
- final String logsDirectory =
testFolder.getOrCreateFolder("indexer-logs").getAbsolutePath();
- final String taskDirectory = testFolder.newFolder().getAbsolutePath();
- final String storageDirectory = testFolder.newFolder().getAbsolutePath();
- log.info(
- "Server[%s] using directories: task directory[%s], logs directory[%s],
storage directory[%s].",
- name, taskDirectory, logsDirectory, storageDirectory
- );
- serverProperties.setProperty("druid.indexer.task.baseDir", taskDirectory);
- serverProperties.setProperty("druid.indexer.logs.directory",
logsDirectory);
- serverProperties.setProperty("druid.storage.storageDirectory",
storageDirectory);
-
- // Add properties for Zookeeper
- if (zookeeper != null) {
- serverProperties.setProperty("druid.zk.service.host",
zookeeper.getConnectString());
- }
-
- if (this instanceof EmbeddedHistorical) {
- serverProperties.setProperty(
- "druid.segmentCache.locations",
- StringUtils.format(
- "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]",
- testFolder.newFolder().getAbsolutePath(),
- MEM_100_MB
- )
- );
- }
-
- // Add properties for RuntimeInfoModule
- serverProperties.setProperty(RuntimeInfoModule.SERVER_MEMORY_PROPERTY,
String.valueOf(serverMemory));
-
serverProperties.setProperty(RuntimeInfoModule.SERVER_DIRECT_MEMORY_PROPERTY,
String.valueOf(serverDirectMemory));
-
serverProperties.putAll(this.serverProperties);
return serverProperties;
}
@@ -252,4 +264,16 @@ public abstract class EmbeddedDruidServer<T extends
EmbeddedDruidServer<T>> impl
*/
void onLifecycleInit(Lifecycle lifecycle);
}
+
+ @FunctionalInterface
+ public interface BeforeStart
+ {
+ /**
+ * Allows a {@link EmbeddedDruidServer} to perform additional
initialization before starting
+ *
+ * @param cluster - the {@link EmbeddedDruidCluster} the {@link
EmbeddedDruidServer} is part of
+ * @param self - the {@link EmbeddedDruidServer} to perform
initialization on
+ */
+ void run(EmbeddedDruidCluster cluster, EmbeddedDruidServer<?> self);
+ }
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedHistorical.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedHistorical.java
index 924e3b45d5a..755c8833858 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedHistorical.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedHistorical.java
@@ -23,6 +23,7 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.cli.CliHistorical;
import org.apache.druid.cli.ServerRunnable;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import java.util.ArrayList;
@@ -35,6 +36,19 @@ import java.util.List;
*/
public class EmbeddedHistorical extends EmbeddedDruidServer<EmbeddedHistorical>
{
+ public EmbeddedHistorical()
+ {
+ addBeforeStartHook((cluster, self) -> {
+ self.addProperty(
+ "druid.segmentCache.locations",
+ StringUtils.format(
+ "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]",
+ cluster.getTestFolder().newFolder().getAbsolutePath(),
+ MEM_100_MB
+ )
+ );
+ });
+ }
@Override
ServerRunnable createRunnable(LifecycleInitHandler handler)
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedResource.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedResource.java
index e777a55c842..c388408c409 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedResource.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedResource.java
@@ -27,6 +27,7 @@ package org.apache.druid.testing.embedded;
*/
public interface EmbeddedResource
{
+
/**
* Starts this resource. Implementations of this method should clean up any
* previous state as it may be called multiple times on a single instance of
@@ -39,6 +40,17 @@ public interface EmbeddedResource
*/
void stop() throws Exception;
+
+ /**
+ * Called before {@link #start()} with a pointer to the current cluster.
This is primarily useful for any
+ * final initialization of {@link EmbeddedDruidServer} that need to
configure themselves based on some
+ * shared resources which have already been started, such as {@link
TestFolder}.
+ */
+ default void beforeStart(EmbeddedDruidCluster cluster)
+ {
+ // do nothing by default
+ }
+
/**
* Called after {@link #start()} with a pointer to the current cluster. This
is intended for use by resources
* that are started before any Druid services, and that need to configure
the Druid services in some way.
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServerLifecycle.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServerLifecycle.java
index 20c330e6913..8cc4948f10d 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServerLifecycle.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServerLifecycle.java
@@ -43,23 +43,14 @@ class EmbeddedServerLifecycle
private final EmbeddedDruidServer server;
- private final TestFolder testFolder;
- private final EmbeddedZookeeper zookeeper;
private final Properties commonProperties;
private ExecutorService executorService;
private final AtomicReference<Lifecycle> lifecycle = new AtomicReference<>();
- EmbeddedServerLifecycle(
- EmbeddedDruidServer server,
- TestFolder testFolder,
- EmbeddedZookeeper zookeeper,
- Properties commonProperties
- )
+ EmbeddedServerLifecycle(EmbeddedDruidServer server, Properties
commonProperties)
{
this.server = server;
- this.zookeeper = zookeeper;
- this.testFolder = testFolder;
this.commonProperties = commonProperties;
}
@@ -151,7 +142,7 @@ class EmbeddedServerLifecycle
try {
final Properties serverProperties = new Properties();
serverProperties.putAll(commonProperties);
- serverProperties.putAll(server.getStartupProperties(testFolder,
zookeeper));
+ serverProperties.putAll(server.getStartupProperties());
final Injector injector = new StartupInjectorBuilder()
.withProperties(serverProperties)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]