This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 62261b05ef4 Move context related things from DartSqlResource to
DartSqlEngine (#18012)
62261b05ef4 is described below
commit 62261b05ef410b717a52bec8d143b8d8d90ac99e
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Tue May 20 06:56:01 2025 +0200
Move context related things from DartSqlResource to DartSqlEngine (#18012)
- provides a way for the SqlEngine to initialize the context
- moves the initialization of DART_QUERY_ID and other DruidQueryConfig
things to be set by DartSqlEngine
- adds support for processing multiple patterns in quidem.filter
This enables all methods of SqlStatementFactory to function for Dart
without the need to set dartQueryId
---
.../msq/dart/controller/http/DartSqlResource.java | 28 ++---------------
.../msq/dart/controller/sql/DartSqlEngine.java | 35 ++++++++++++++++++++--
.../dart/controller/http/DartSqlResourceTest.java | 6 ++--
.../org/apache/druid/msq/test/CalciteDartTest.java | 3 --
.../druid/msq/test/DartComponentSupplier.java | 1 -
.../org/apache/druid/sql/AbstractStatement.java | 2 +-
.../apache/druid/sql/calcite/run/SqlEngine.java | 7 +++++
.../apache/druid/quidem/DruidQuidemTestBase.java | 25 ++++++++++++----
8 files changed, 64 insertions(+), 43 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
index ea45607f1b4..103693e3c31 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
@@ -31,10 +31,7 @@ import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
-import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.indexing.error.CancellationReason;
-import org.apache.druid.query.BaseQuery;
-import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.ResponseContextConfig;
@@ -71,7 +68,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.UUID;
import java.util.stream.Collectors;
/**
@@ -89,8 +85,8 @@ public class DartSqlResource extends SqlResource
private final SqlLifecycleManager sqlLifecycleManager;
private final DartSqlClients sqlClients;
private final AuthorizerMapper authorizerMapper;
- private final DefaultQueryConfig dartQueryConfig;
+ // make dartqueryId a prefix the {{queeryid}}-{{startupTime}}-{{queryIndex}
@Inject
public DartSqlResource(
final ObjectMapper jsonMapper,
@@ -101,8 +97,7 @@ public class DartSqlResource extends SqlResource
final DartSqlClients sqlClients,
final ServerConfig serverConfig,
final ResponseContextConfig responseContextConfig,
- @Self final DruidNode selfNode,
- @Dart final DefaultQueryConfig dartQueryConfig
+ @Self final DruidNode selfNode
)
{
super(
@@ -118,7 +113,6 @@ public class DartSqlResource extends SqlResource
this.sqlLifecycleManager = sqlLifecycleManager;
this.sqlClients = sqlClients;
this.authorizerMapper = authorizerMapper;
- this.dartQueryConfig = dartQueryConfig;
}
/**
@@ -215,24 +209,6 @@ public class DartSqlResource extends SqlResource
{
final Map<String, Object> context = new HashMap<>(sqlQuery.getContext());
- // Default context keys from dartQueryConfig.
- for (Map.Entry<String, Object> entry :
dartQueryConfig.getContext().entrySet()) {
- context.putIfAbsent(entry.getKey(), entry.getValue());
- }
-
- /**
- * Dart queryId must be globally unique, so we cannot use the
user-provided {@link QueryContexts#CTX_SQL_QUERY_ID}
- * or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID in {@link
DartSqlResource#doPost}, overriding whatever
- * the user may have provided. This becomes the {@link
Controller#queryId()}.
- *
- * The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still
registered with the {@link SqlLifecycleManager}
- * for purposes of query cancellation.
- *
- * The user-provided {@link BaseQuery#QUERY_ID} is ignored.
- */
- final String dartQueryId = UUID.randomUUID().toString();
- context.put(QueryContexts.CTX_DART_QUERY_ID, dartQueryId);
-
return super.doPost(sqlQuery.withOverridenContext(context), req);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index afcd54061c6..a75415f7d05 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -29,12 +29,14 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
import org.apache.druid.msq.sql.DartQueryKitSpecFactory;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
+import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.initialization.ServerConfig;
@@ -47,6 +49,7 @@ import org.apache.druid.sql.calcite.run.SqlEngines;
import org.apache.druid.sql.destination.IngestDestination;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
@LazySingleton
@@ -60,6 +63,7 @@ public class DartSqlEngine implements SqlEngine
private final ExecutorService controllerExecutor;
private final ServerConfig serverConfig;
private final QueryKitSpecFactory queryKitSpecFactory;
+ private final DefaultQueryConfig dartQueryConfig;
@Inject
public DartSqlEngine(
@@ -67,7 +71,8 @@ public class DartSqlEngine implements SqlEngine
DartControllerRegistry controllerRegistry,
DartControllerConfig controllerConfig,
DartQueryKitSpecFactory queryKitSpecFactory,
- ServerConfig serverConfig
+ ServerConfig serverConfig,
+ @Dart DefaultQueryConfig dartQueryConfig
)
{
this(
@@ -76,7 +81,8 @@ public class DartSqlEngine implements SqlEngine
controllerConfig,
Execs.multiThreaded(controllerConfig.getConcurrentQueries(),
"dart-controller-%s"),
queryKitSpecFactory,
- serverConfig
+ serverConfig,
+ dartQueryConfig
);
}
@@ -86,7 +92,8 @@ public class DartSqlEngine implements SqlEngine
DartControllerConfig controllerConfig,
ExecutorService controllerExecutor,
QueryKitSpecFactory queryKitSpecFactory,
- ServerConfig serverConfig
+ ServerConfig serverConfig,
+ DefaultQueryConfig dartQueryConfig
)
{
this.controllerContextFactory = controllerContextFactory;
@@ -95,6 +102,7 @@ public class DartSqlEngine implements SqlEngine
this.controllerExecutor = controllerExecutor;
this.queryKitSpecFactory = queryKitSpecFactory;
this.serverConfig = serverConfig;
+ this.dartQueryConfig = dartQueryConfig;
}
@Override
@@ -198,4 +206,25 @@ public class DartSqlEngine implements SqlEngine
// Defensive, because we expect this method will not be called without the
CAN_INSERT and CAN_REPLACE features.
throw DruidException.defensive("Cannot execute DML commands with
engine[%s]", name());
}
+
+ @Override
+ public void initContextMap(Map<String, Object> contextMap)
+ {
+ // Default context keys from dartQueryConfig.
+ for (Map.Entry<String, Object> entry :
dartQueryConfig.getContext().entrySet()) {
+ contextMap.putIfAbsent(entry.getKey(), entry.getValue());
+ }
+ /**
+ * Dart queryId must be globally unique, so we cannot use the
user-provided {@link QueryContexts#CTX_SQL_QUERY_ID}
+ * or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID in {@link
DartSqlResource#doPost}, overriding whatever
+ * the user may have provided. This becomes the {@link
Controller#queryId()}.
+ *
+ * The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still
registered with the {@link SqlLifecycleManager}
+ * for purposes of query cancellation.
+ *
+ * The user-provided {@link BaseQuery#QUERY_ID} is ignored.
+ */
+ final String dartQueryId = UUID.randomUUID().toString();
+ contextMap.put(QueryContexts.CTX_DART_QUERY_ID, dartQueryId);
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 90fc6dc810d..87efb900cb9 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -206,7 +206,8 @@ public class DartSqlResourceTest extends MSQTestBase
StringUtils.encodeForFormat(getClass().getSimpleName() +
"-controller-exec")
),
new DartQueryKitSpecFactory(new
TestTimelineServerView(Collections.emptyList())),
- new ServerConfig()
+ new ServerConfig(),
+ new DefaultQueryConfig(ImmutableMap.of("foo", "bar"))
);
final DruidSchemaCatalog rootSchema =
QueryFrameworkUtils.createMockRootSchema(
@@ -256,8 +257,7 @@ public class DartSqlResourceTest extends MSQTestBase
dartSqlClients,
new ServerConfig() /* currently only used for error transform strategy
*/,
ResponseContextConfig.newConfig(false),
- SELF_NODE,
- new DefaultQueryConfig(ImmutableMap.of("foo", "bar"))
+ SELF_NODE
);
// Setup mocks
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
index 9a135263b03..afda9e44628 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
@@ -30,8 +30,6 @@ import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.jupiter.api.Test;
-import java.util.UUID;
-
@SqlTestFrameworkConfig.ComponentSupplier(DartComponentSupplier.class)
public class CalciteDartTest extends BaseCalciteQueryTest
{
@@ -41,7 +39,6 @@ public class CalciteDartTest extends BaseCalciteQueryTest
return new QueryTestBuilder(new CalciteTestConfig(true))
.queryContext(
ImmutableMap.<String, Object>builder()
- .put(QueryContexts.CTX_DART_QUERY_ID,
UUID.randomUUID().toString())
.put(QueryContexts.ENABLE_DEBUG, true)
.build()
)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
index 3c0849a6e99..c45c540b1c5 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
@@ -124,7 +124,6 @@ public class DartComponentSupplier extends
AbstractMSQComponentSupplierDelegate
static class DartTestOverrideModule implements DruidModule
{
-
@Provides
@LazySingleton
public DruidMeta createMeta(DartDruidMeta druidMeta)
diff --git a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
index 160373b17d4..32003a9bc32 100644
--- a/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/AbstractStatement.java
@@ -83,7 +83,7 @@ public abstract class AbstractStatement implements Closeable
this.reporter = new SqlExecutionReporter(this, remoteAddress);
this.queryPlus = queryPlus;
this.queryContext = new HashMap<>(queryPlus.context());
-
+ sqlToolbox.engine.initContextMap(queryContext);
// "bySegment" results are never valid to use with SQL because the result
format is incompatible
// so, overwrite any user specified context to avoid exceptions down the
line
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
index 1d33b019e68..3f906b9ea40 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
@@ -109,4 +109,11 @@ public interface SqlEngine
RelRoot relRoot,
PlannerContext plannerContext
) throws ValidationException;
+
+ /**
+ * Enables the engine to make changes to the Context.
+ */
+ default void initContextMap(Map<String, Object> contextMap)
+ {
+ }
}
diff --git a/sql/src/test/java/org/apache/druid/quidem/DruidQuidemTestBase.java
b/sql/src/test/java/org/apache/druid/quidem/DruidQuidemTestBase.java
index 79e2261d0f9..f71e500a9b8 100644
--- a/sql/src/test/java/org/apache/druid/quidem/DruidQuidemTestBase.java
+++ b/sql/src/test/java/org/apache/druid/quidem/DruidQuidemTestBase.java
@@ -29,6 +29,8 @@ import net.hydromatic.quidem.Quidem.ConfigBuilder;
import org.apache.calcite.test.DiffTestCase;
import org.apache.calcite.util.Closer;
import org.apache.calcite.util.Util;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.apache.commons.io.filefilter.OrFileFilter;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.druid.concurrent.Threads;
@@ -99,7 +101,7 @@ public abstract class DruidQuidemTestBase
private static final String PROPERTY_FILTER = "quidem.filter";
- private FileFilter filter = TrueFileFilter.INSTANCE;
+ private final FileFilter filter;
private DruidQuidemRunner druidQuidemRunner;
@@ -111,13 +113,24 @@ public abstract class DruidQuidemTestBase
public DruidQuidemTestBase(DruidQuidemRunner druidQuidemRunner)
{
String filterStr = System.getProperty(PROPERTY_FILTER, null);
- if (filterStr != null) {
- if (!filterStr.endsWith("*") && !filterStr.endsWith(IQ_SUFFIX)) {
- filterStr = filterStr + IQ_SUFFIX;
+ filter = buildFileFilter(filterStr);
+ this.druidQuidemRunner = druidQuidemRunner;
+ }
+
+ private IOFileFilter buildFileFilter(String filterStr)
+ {
+ if (null == filterStr) {
+ return TrueFileFilter.INSTANCE;
+ }
+ List<IOFileFilter> fileFilters = new ArrayList<>();
+ for (String filter : filterStr.split(",")) {
+
+ if (!filter.endsWith("*") && !filter.endsWith(IQ_SUFFIX)) {
+ filter = filterStr + IQ_SUFFIX;
}
- filter = new WildcardFileFilter(filterStr);
+ fileFilters.add(new WildcardFileFilter(filter));
}
- this.druidQuidemRunner = druidQuidemRunner;
+ return new OrFileFilter(fileFilters);
}
protected static class QuidemTestCaseConfiguration
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]