This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ff70ad60179 Migrate query integration tests to embedded framework
(#18978)
ff70ad60179 is described below
commit ff70ad60179c5912007dcf03ebc55a0edae937bb
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Feb 3 18:27:28 2026 +0530
Migrate query integration tests to embedded framework (#18978)
Changes
---------
- Move `ITBroadcastJoinQueryTest` to embedded framework
- Remove `ITWikipediaQueryTest`
- Add `QueryLaningTest` which was the only useful assertion being done in
the wikipedia test
---
.github/workflows/cron-job-its.yml | 2 +-
.github/workflows/standard-its.yml | 15 +-
.../embedded/query/BroadcastJoinQueryTest.java | 157 ++++++++++++++
.../testing/embedded/query/QueryLaningTest.java | 96 +++++++++
integration-tests/README.md | 4 +-
integration-tests/docker/druid.sh | 4 +-
integration-tests/script/docker_compose_args.sh | 4 -
.../java/org/apache/druid/tests/TestNGGroup.java | 7 -
.../tests/query/ITBroadcastJoinQueryTest.java | 165 ---------------
.../druid/tests/query/ITWikipediaQueryTest.java | 225 ---------------------
.../indexer/broadcast_join_index_task.json | 82 --------
...broadcast_join_after_drop_metadata_queries.json | 9 -
.../queries/broadcast_join_metadata_queries.json | 26 ---
.../resources/queries/broadcast_join_queries.json | 29 ---
14 files changed, 259 insertions(+), 566 deletions(-)
diff --git a/.github/workflows/cron-job-its.yml
b/.github/workflows/cron-job-its.yml
index 063d4adddce..25a4259f8df 100644
--- a/.github/workflows/cron-job-its.yml
+++ b/.github/workflows/cron-job-its.yml
@@ -60,7 +60,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- testing_group: [ query, security ]
+ testing_group: [ security ]
uses: ./.github/workflows/reusable-standard-its.yml
needs: build
with:
diff --git a/.github/workflows/standard-its.yml
b/.github/workflows/standard-its.yml
index fba85ac0055..be8b19ef9a2 100644
--- a/.github/workflows/standard-its.yml
+++ b/.github/workflows/standard-its.yml
@@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- testing_group: [query, security, centralized-datasource-schema]
+ testing_group: [security]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' ||
needs.changes.outputs.common-extensions == 'true' }}
with:
@@ -57,16 +57,3 @@ jobs:
use_indexer: middleManager
override_config_path: ./environment-configs/test-groups/prepopulated-data
group: ${{ matrix.testing_group }}
-
- integration-query-tests-middleManager-mariaDB:
- needs: changes
- uses: ./.github/workflows/reusable-standard-its.yml
- if: ${{ needs.changes.outputs.core == 'true' ||
needs.changes.outputs.common-extensions == 'true' }}
- with:
- build_jdk: 17
- runtime_jdk: 17
- testing_groups: -Dgroups=query
- use_indexer: middleManager
- mysql_driver: org.mariadb.jdbc.Driver
- override_config_path: ./environment-configs/test-groups/prepopulated-data
- group: query
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/BroadcastJoinQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/BroadcastJoinQueryTest.java
new file mode 100644
index 00000000000..3bd29f5354e
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/BroadcastJoinQueryTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.TestIndex;
+import
org.apache.druid.segment.loading.BroadcastJoinableMMappedQueryableSegmentizerFactory;
+import
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.indexer.AbstractIndexerTest;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test for broadcast join with segments created using
+ * {@link BroadcastJoinableMMappedQueryableSegmentizerFactory}.
+ */
+public class BroadcastJoinQueryTest extends AbstractIndexerTest
+{
+ @Override
+ protected void addResources(EmbeddedDruidCluster cluster)
+ {
+ broker.addBeforeStartHook((c, self) -> {
+ self.addProperty(
+ "druid.segmentCache.locations",
+ StringUtils.format(
+ "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]",
+ c.getTestFolder().newFolder().getAbsolutePath(),
+ HumanReadableBytes.parse("100M")
+ )
+ );
+ });
+ }
+
+ @Test
+ public void testJoin()
+ {
+ ingestDataIntoBaseDatasource();
+
+ // Set broadcast rules for the indexed-table datasource
+ final String joinDatasource =
EmbeddedClusterApis.createTestDatasourceName();
+ cluster.callApi().onLeaderCoordinator(
+ c -> c.updateRulesForDatasource(
+ joinDatasource,
+ List.of(new ForeverBroadcastDistributionRule())
+ )
+ );
+
+ // Ingest a single segment into the indexed-table datasource
+ final Task task2 = MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS
+ .get()
+ .tuningConfig(
+ t -> t.withIndexSpec(
+ IndexSpec.builder().withSegmentLoader(
+ new BroadcastJoinableMMappedQueryableSegmentizerFactory(
+ TestIndex.INDEX_IO,
+ Set.of("user", "language", "added", "deleted")
+ )
+ ).build())
+ )
+ .dynamicPartitionWithMaxRows(100)
+ .granularitySpec("YEAR", "SECOND", true)
+ .dataSource(joinDatasource)
+ .withId(IdUtils.getRandomId());
+ cluster.callApi().runTask(task2, overlord);
+
+ // Wait for the broadcast segment to be loaded on the historical and the
broker
+ coordinator.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("segment/loadQueue/success")
+ .hasDimension(DruidMetrics.DATASOURCE, joinDatasource),
+ agg -> agg.hasCountAtLeast(2)
+ );
+
+ // Run some metadata queries
+ cluster.callApi().waitForAllSegmentsToBeAvailable(joinDatasource,
coordinator, broker);
+ cluster.callApi().verifySqlQuery(
+ "SELECT IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE
TABLE_NAME = '%s'",
+ joinDatasource,
+ "YES,YES"
+ );
+ cluster.callApi().verifySqlQuery(
+ "SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME =
'%s'",
+ joinDatasource,
+ "21"
+ );
+
+ // Run some queries
+ Assertions.assertEquals(
+ "triplets,2,1810.0\n" +
+ "speed,2,918.0\n" +
+ "masterYi,2,246.0\n" +
+ "nuclear,2,114.0\n" +
+ "stringer,2,2.0",
+ cluster.runSql("SELECT \"user\", COUNT(*), SUM(\"added\") FROM %s
GROUP BY 1 ORDER BY 3 DESC", joinDatasource)
+ );
+ Assertions.assertEquals(
+ "triplets,2,1810.0\n" +
+ "speed,2,918.0\n" +
+ "masterYi,2,246.0\n" +
+ "nuclear,2,114.0\n" +
+ "stringer,2,2.0",
+ cluster.runSql("SELECT \"user\", COUNT(*), SUM(\"added\") FROM %s
GROUP BY 1 ORDER BY 3 DESC", dataSource)
+ );
+
+ // Run join query, the sums have doubled due to the Cartesian product
+ Assertions.assertEquals(
+ "triplets,4,3620.0\n" +
+ "speed,4,1836.0\n" +
+ "masterYi,4,492.0\n" +
+ "nuclear,4,228.0\n" +
+ "stringer,4,4.0",
+ cluster.runSql(
+ "SELECT a.\"user\", COUNT(*), SUM(a.\"added\")" +
+ " FROM %s a INNER JOIN %s b ON a.\"user\" = b.\"user\"" +
+ " GROUP BY 1 ORDER BY 3 DESC",
+ dataSource, joinDatasource
+ )
+ );
+ }
+
+ private void ingestDataIntoBaseDatasource()
+ {
+ final Task task1 = MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS
+ .get()
+ .dataSource(dataSource)
+ .withId(IdUtils.getRandomId());
+ cluster.callApi().runTask(task1, overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryLaningTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryLaningTest.java
new file mode 100644
index 00000000000..f2b09c29940
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryLaningTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class QueryLaningTest extends QueryTestBase
+{
+ private static final String LANE_1 = "lane1";
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ broker.addProperty("druid.query.scheduler.laning.strategy", "manual")
+ .addProperty("druid.query.scheduler.laning.lanes." + LANE_1, "1");
+
+ return super.createCluster().useDefaultTimeoutForLatchableEmitter(100);
+ }
+
+ @Override
+ protected void beforeAll()
+ {
+ jsonMapper = overlord.bindings().jsonMapper();
+ }
+
+ @Test
+ public void test_queryUsesLaneInQueryContext_inManualStrategy()
+ {
+ final String testDatasource = ingestBasicData();
+ final String result = cluster.callApi().onAnyBroker(
+ b -> b.submitSqlQuery(createQuery("SELECT SUM(\"value\") FROM %s",
testDatasource, LANE_1))
+ ).trim();
+ Assertions.assertEquals("3003.0", result);
+
+ broker.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("query/priority").hasDimension("lane",
LANE_1)
+ );
+ }
+
+ @Test
+ @Disabled("sleep() function does not seem to keep the lane occupied")
+ public void test_queryFails_ifLaneIsFull()
+ {
+ final String testDatasource = ingestBasicData();
+
+ // Fire a slow query which keeps the lane occupied
+ executeQueryAsync(
+ routerEndpoint,
+ createQuery("SELECT sleep(10), SUM(\"value\") FROM %s",
testDatasource, LANE_1)
+ );
+
+ // Fire another query and ensure that we get a capacity exceeded exception
+ cluster.callApi().onAnyBroker(
+ b -> b.submitSqlQuery(createQuery("SELECT SUM(\"value\") FROM %s",
testDatasource, LANE_1))
+ );
+ }
+
+ private ClientSqlQuery createQuery(String sql, String dataSource, String
lane)
+ {
+ return new ClientSqlQuery(
+ StringUtils.format(sql, dataSource),
+ ResultFormat.CSV.name(),
+ false,
+ false,
+ false,
+ Map.of(QueryContexts.LANE_KEY, lane),
+ null
+ );
+ }
+}
diff --git a/integration-tests/README.md b/integration-tests/README.md
index ddf37a57ce3..b9f147e7b91 100644
--- a/integration-tests/README.md
+++ b/integration-tests/README.md
@@ -80,8 +80,8 @@ Parameters:
the test group for a given test as an annotation in the respective test class.
A list of test groups can be found at
`integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java`. The
annotation uses a string
constant defined in `TestNGGroup.java`, be sure to use the constant value, not
name. For example,
-if your test has the annotation: `@Test(groups = TestNGGroup.QUERY)` then use
the argument
-`-Dgroups=query`.
+if your test has the annotation: `@Test(groups = TestNGGroup.SECURITY)` then
use the argument
+`-Dgroups=security`.
* Test Name: Use the fully-qualified class name. For example,
`org.apache.druid.tests.query.ITSqlQueryTest`.
diff --git a/integration-tests/docker/druid.sh
b/integration-tests/docker/druid.sh
index 9478a9e9ef4..ce63a22b5a9 100755
--- a/integration-tests/docker/druid.sh
+++ b/integration-tests/docker/druid.sh
@@ -100,10 +100,10 @@ setupData()
echo "MySQL did not start"
exit 1
fi
- # The "query" and "security" test groups require data to be setup before
running the tests.
+ # The "security" test groups require data to be setup before running the
tests.
# In particular, they requires segments to be download from a pre-existing
s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3
credientials set below.
- if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "upgrade" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ]; then
+ if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u
root druid
fi
diff --git a/integration-tests/script/docker_compose_args.sh
b/integration-tests/script/docker_compose_args.sh
index fcf5bdafa7c..4589828eb78 100644
--- a/integration-tests/script/docker_compose_args.sh
+++ b/integration-tests/script/docker_compose_args.sh
@@ -45,10 +45,6 @@ getComposeArgs()
then
# default + with override config + schema registry container
echo "-f ${DOCKERDIR}/docker-compose.yml -f
${DOCKERDIR}/docker-compose.schema-registry.yml"
- elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "centralized-datasource-schema" ]
- then
- # cluster with overriden properties for broker and coordinator
- echo "-f ${DOCKERDIR}/docker-compose.centralized-datasource-schema.yml"
else
# default
echo "-f ${DOCKERDIR}/docker-compose.yml"
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 6db9e3999f6..e75c79350b8 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -25,11 +25,6 @@ package org.apache.druid.tests;
*/
public class TestNGGroup
{
- /**
- * This group can only be run individually using -Dgroups=query since it
requires specific test data setup.
- */
- public static final String QUERY = "query";
-
/**
* This group can only be run individually using -Dgroups=security since it
requires specific test data setup.
*/
@@ -104,6 +99,4 @@ public class TestNGGroup
* Kinesis stream endpoint for a region must also be pass to mvn with
-Ddruid.test.config.streamEndpoint=<ENDPOINT>
*/
public static final String KINESIS_DATA_FORMAT = "kinesis-data-format";
-
- public static final String CENTRALIZED_DATASOURCE_SCHEMA =
"centralized-datasource-schema";
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
deleted file mode 100644
index 3e5e160c173..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.query;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.http.client.HttpClient;
-import
org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.guice.TestClient;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.SqlTestQueryHelper;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = {TestNGGroup.QUERY, TestNGGroup.CENTRALIZED_DATASOURCE_SCHEMA})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
-{
- private static final Logger LOG = new Logger(ITBroadcastJoinQueryTest.class);
- private static final String BROADCAST_JOIN_TASK =
"/indexer/broadcast_join_index_task.json";
- private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE =
"/queries/broadcast_join_metadata_queries.json";
- private static final String
BROADCAST_JOIN_METADATA_QUERIES_AFTER_DROP_RESOURCE =
"/queries/broadcast_join_after_drop_metadata_queries.json";
- private static final String BROADCAST_JOIN_QUERIES_RESOURCE =
"/queries/broadcast_join_queries.json";
- private static final String BROADCAST_JOIN_DATASOURCE =
"broadcast_join_wikipedia_test";
-
- @Inject
- ServerDiscoveryFactory factory;
-
- @Inject
- CoordinatorResourceTestClient coordinatorClient;
-
- @Inject
- SqlTestQueryHelper queryHelper;
-
- @Inject
- DataLoaderHelper dataLoaderHelper;
-
- @Inject
- @TestClient
- HttpClient httpClient;
-
- @Inject
- IntegrationTestingConfig config;
-
- @Test
- public void testBroadcastJoin() throws Exception
- {
- final Closer closer = Closer.create();
- try {
- closer.register(unloader(BROADCAST_JOIN_DATASOURCE));
- closer.register(() -> {
- // remove broadcast rule
- try {
- coordinatorClient.postLoadRules(
- BROADCAST_JOIN_DATASOURCE,
- ImmutableList.of()
- );
- }
- catch (Exception e) {
- LOG.error(e, "Failed to post load rules");
- }
- });
-
- // prepare for broadcast by adding forever broadcast load rule
- coordinatorClient.postLoadRules(
- BROADCAST_JOIN_DATASOURCE,
- ImmutableList.of(new ForeverBroadcastDistributionRule())
- );
-
- // load the data
- String taskJson =
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK),
BROADCAST_JOIN_DATASOURCE);
- indexer.submitTask(taskJson);
-
- dataLoaderHelper.waitUntilDatasourceIsReady(BROADCAST_JOIN_DATASOURCE);
-
- // query metadata until druid schema is refreshed and datasource is
available joinable
- ITRetryUtil.retryUntilTrue(
- () -> {
- try {
- queryHelper.testQueriesFromString(
- queryHelper.getQueryURL(config.getRouterUrl()),
- replaceJoinTemplate(
-
getResourceAsString(BROADCAST_JOIN_METADATA_QUERIES_RESOURCE),
- BROADCAST_JOIN_DATASOURCE
- )
- );
- return true;
- }
- catch (Exception ex) {
- LOG.error(ex, "SQL metadata not yet in expected state");
- return false;
- }
- },
- "waiting for SQL metadata refresh"
- );
-
- // now do some queries
- queryHelper.testQueriesFromString(
- queryHelper.getQueryURL(config.getRouterUrl()),
-
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE),
BROADCAST_JOIN_DATASOURCE)
- );
- }
-
- finally {
- closer.close();
-
- // query metadata until druid schema is refreshed and datasource is no
longer available
- ITRetryUtil.retryUntilTrue(
- () -> {
- try {
- queryHelper.testQueriesFromString(
- queryHelper.getQueryURL(config.getRouterUrl()),
- replaceJoinTemplate(
-
getResourceAsString(BROADCAST_JOIN_METADATA_QUERIES_AFTER_DROP_RESOURCE),
- BROADCAST_JOIN_DATASOURCE
- )
- );
- return true;
- }
- catch (Exception ex) {
- LOG.error(ex, "SQL metadata not yet in expected state");
- return false;
- }
- },
- "waiting for SQL metadata refresh"
- );
- }
- }
-
- private static String replaceJoinTemplate(String template, String
joinDataSource)
- {
- return StringUtils.replace(
- StringUtils.replace(template, "%%JOIN_DATASOURCE%%", joinDataSource),
- "%%REGULAR_DATASOURCE%%",
- ITWikipediaQueryTest.WIKIPEDIA_DATA_SOURCE
- );
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
deleted file mode 100644
index 4bb4653877d..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.query;
-
-import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.query.Druids;
-import org.apache.druid.query.QueryCapacityExceededException;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.clients.QueryResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.druid.tests.TestNGGroup;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import javax.ws.rs.core.MediaType;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Future;
-
-@Test(groups = {TestNGGroup.QUERY, TestNGGroup.CENTRALIZED_DATASOURCE_SCHEMA})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITWikipediaQueryTest
-{
- private static final Logger LOG = new Logger(ITWikipediaQueryTest.class);
-
- public static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
- private static final String WIKI_LOOKUP = "wiki-simple";
- private static final String WIKIPEDIA_QUERIES_RESOURCE =
"/queries/wikipedia_editstream_queries.json";
- private static final String WIKIPEDIA_LOOKUP_RESOURCE =
"/queries/wiki-lookup-config.json";
-
- @Inject
- private CoordinatorResourceTestClient coordinatorClient;
- @Inject
- private TestQueryHelper queryHelper;
- @Inject
- private QueryResourceTestClient queryClient;
- @Inject
- private IntegrationTestingConfig config;
-
- @BeforeMethod
- public void before() throws Exception
- {
- // ensure that wikipedia segments are loaded completely
- ITRetryUtil.retryUntilTrue(
- () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE),
"wikipedia segment load"
- );
- if (!coordinatorClient.areLookupsLoaded(WIKI_LOOKUP)) {
- coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
- ITRetryUtil.retryUntilTrue(
- () -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia
lookup load"
- );
- }
- }
-
- /**
- * A combination of request Content-Type and Accept HTTP header
- * The first is Content-Type that cannot be null while the 2nd is Accept
that could be null
- * <p>
- * When Accept is null, its value defaults to the value of Content-Type
- */
- @DataProvider
- public static Object[][] encodingCombination()
- {
- return new Object[][]{
- {MediaType.APPLICATION_JSON, null},
- {MediaType.APPLICATION_JSON, MediaType.APPLICATION_JSON},
- {MediaType.APPLICATION_JSON,
SmileMediaTypes.APPLICATION_JACKSON_SMILE},
- {SmileMediaTypes.APPLICATION_JACKSON_SMILE, null},
- {SmileMediaTypes.APPLICATION_JACKSON_SMILE,
MediaType.APPLICATION_JSON},
- {SmileMediaTypes.APPLICATION_JACKSON_SMILE,
SmileMediaTypes.APPLICATION_JACKSON_SMILE},
- };
- }
-
- @Test(dataProvider = "encodingCombination")
- public void testWikipediaQueriesFromFile(String contentType, String accept)
- throws Exception
- {
- // run tests on a new query helper
- TestQueryHelper queryHelper = this.queryHelper.withEncoding(contentType,
accept);
-
- queryHelper.testQueriesFromFile(WIKIPEDIA_QUERIES_RESOURCE);
- }
-
- @Test
- public void testQueryLaningLaneIsLimited() throws Exception
- {
- ITRetryUtil.retryUntil(
- () -> {
- // the broker is configured with a manually defined query lane,
'one' with limit 1
- // -Ddruid.query.scheduler.laning.type=manual
- // -Ddruid.query.scheduler.laning.lanes.one=1
- // by issuing 50 queries, at least 1 of them will succeed on 'one',
and at least 1 of them will overlap enough to
- // get limited.
- // It's possible but unlikely that these queries execute in a way
that none of them overlap, so we
- // retry this test a few times to compensate for this.
- final int numQueries = 50;
- List<Future<StatusResponseHolder>> futures = new
ArrayList<>(numQueries);
- for (int i = 0; i < numQueries; i++) {
- futures.add(
- queryClient.queryAsync(
- queryHelper.getQueryURL(config.getBrokerUrl()),
- getQueryBuilder().build()
- )
- );
- }
-
- int success = 0;
- int limited = 0;
-
- for (Future<StatusResponseHolder> future : futures) {
- StatusResponseHolder status = future.get();
- if (status.getStatus().getCode() ==
QueryCapacityExceededException.STATUS_CODE) {
- limited++;
-
Assert.assertTrue(status.getContent().contains(QueryCapacityExceededException.makeLaneErrorMessage("one",
1)));
- } else if (status.getStatus().getCode() ==
HttpResponseStatus.OK.getCode()) {
- success++;
- }
- }
-
- try {
- Assert.assertTrue(success > 0);
- Assert.assertTrue(limited > 0);
- return true;
- }
- catch (AssertionError ae) {
- LOG.error(ae, "Got assertion error in
testQueryLaningLaneIsLimited");
- return false;
- }
- },
- true,
- 5000,
- 3,
- "testQueryLaningLaneIsLimited"
- );
-
- // test another to make sure we can still issue one query at a time
- StatusResponseHolder followUp = queryClient.queryAsync(
- queryHelper.getQueryURL(config.getBrokerUrl()),
- getQueryBuilder().build()
- ).get();
-
- Assert.assertEquals(followUp.getStatus().getCode(),
HttpResponseStatus.OK.getCode());
-
- StatusResponseHolder andAnother = queryClient.queryAsync(
- queryHelper.getQueryURL(config.getBrokerUrl()),
- getQueryBuilder().build()
- ).get();
-
- Assert.assertEquals(andAnother.getStatus().getCode(),
HttpResponseStatus.OK.getCode());
- }
-
- @Test
- public void testQueryLaningWithNoLane() throws Exception
- {
- // the broker is configured with a manually defined query lane, 'one' with
limit 1
- // -Ddruid.query.scheduler.laning.type=manual
- // -Ddruid.query.scheduler.laning.lanes.one=1
- // these queries will not belong to the lane so none of them should be
limited
- final int numQueries = 50;
- List<Future<StatusResponseHolder>> futures = new ArrayList<>(numQueries);
- for (int i = 0; i < numQueries; i++) {
- futures.add(
- queryClient.queryAsync(
- queryHelper.getQueryURL(config.getBrokerUrl()),
- getQueryBuilder().context(ImmutableMap.of("queryId",
UUID.randomUUID().toString())).build()
- )
- );
- }
-
- int success = 0;
- int limited = 0;
-
- for (Future<StatusResponseHolder> future : futures) {
- StatusResponseHolder status = future.get();
- if (status.getStatus().getCode() ==
QueryCapacityExceededException.STATUS_CODE) {
- limited++;
- } else if (status.getStatus().getCode() ==
HttpResponseStatus.OK.getCode()) {
- success++;
- }
- }
-
- Assert.assertTrue(success > 0);
- Assert.assertEquals(limited, 0);
-
- }
-
- private Druids.TimeseriesQueryBuilder getQueryBuilder()
- {
- return Druids.newTimeseriesQueryBuilder()
- .dataSource("wikipedia_editstream")
- .aggregators(new CountAggregatorFactory("chocula"))
- .intervals("2013-01-01T00:00:00.000/2013-01-08T00:00:00.000")
- .context(ImmutableMap.of("lane", "one", "queryId",
UUID.randomUUID().toString()));
- }
-}
diff --git
a/integration-tests/src/test/resources/indexer/broadcast_join_index_task.json
b/integration-tests/src/test/resources/indexer/broadcast_join_index_task.json
deleted file mode 100644
index 699ca547ff4..00000000000
---
a/integration-tests/src/test/resources/indexer/broadcast_join_index_task.json
+++ /dev/null
@@ -1,82 +0,0 @@
-{
- "type": "index_parallel",
- "spec": {
- "dataSchema": {
- "dataSource": "%%JOIN_DATASOURCE%%",
- "timestampSpec": {
- "column": "timestamp",
- "format": "iso"
- },
- "dimensionsSpec": {
- "dimensions": [
- "page",
- "language",
- "user",
- "unpatrolled",
- "newPage",
- "robot",
- "anonymous",
- "namespace",
- "continent",
- "country",
- "region",
- "city",
- {
- "type": "long",
- "name": "added"
- },
- {
- "type": "long",
- "name": "deleted"
- }
- ]
- },
- "metricsSpec": [
- {
- "type": "count",
- "name": "count"
- },
- {
- "type": "doubleSum",
- "name": "sum_added",
- "fieldName": "added"
- },
- {
- "type": "doubleSum",
- "name": "sum_deleted",
- "fieldName": "deleted"
- },
- {
- "type": "doubleSum",
- "name": "delta",
- "fieldName": "delta"
- }
- ],
- "granularitySpec": {
- "segmentGranularity": "YEAR",
- "queryGranularity": "second"
- }
- },
- "ioConfig": {
- "type": "index_parallel",
- "inputSource": {
- "type": "local",
- "baseDir": "/resources/data/broadcast/",
- "filter": "wikipedia_index_data*"
- },
- "appendToExisting": false,
- "inputFormat": {
- "type": "json"
- }
- },
- "tuningConfig": {
- "type": "index_parallel",
- "indexSpec": {
- "segmentLoader": {
- "type": "broadcastJoinableMMapSegmentFactory",
- "keyColumns": ["user", "language", "added", "deleted"]
- }
- }
- }
- }
-}
\ No newline at end of file
diff --git
a/integration-tests/src/test/resources/queries/broadcast_join_after_drop_metadata_queries.json
b/integration-tests/src/test/resources/queries/broadcast_join_after_drop_metadata_queries.json
deleted file mode 100644
index 26e36885c6f..00000000000
---
a/integration-tests/src/test/resources/queries/broadcast_join_after_drop_metadata_queries.json
+++ /dev/null
@@ -1,9 +0,0 @@
-[
- {
- "description": "query information schema to make sure datasource is
joinable and broadcast",
- "query": {
- "query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%' AND
IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'"
- },
- "expectedResults": []
- }
-]
\ No newline at end of file
diff --git
a/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json
b/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json
deleted file mode 100644
index af036c76638..00000000000
---
a/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json
+++ /dev/null
@@ -1,26 +0,0 @@
-[
- {
- "description": "query information schema to make sure datasource is
joinable and broadcast",
- "query": {
- "query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%' AND
IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'"
- },
- "expectedResults": [
- {
- "TABLE_NAME": "%%JOIN_DATASOURCE%%",
- "IS_JOINABLE": "YES",
- "IS_BROADCAST": "YES"
- }
- ]
- },
- {
- "description": "query information schema to make sure druid schema is
refreshed",
- "query": {
- "query": "SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE
TABLE_NAME = '%%JOIN_DATASOURCE%%'"
- },
- "expectedResults": [
- {
- "EXPR$0": 19
- }
- ]
- }
-]
\ No newline at end of file
diff --git
a/integration-tests/src/test/resources/queries/broadcast_join_queries.json
b/integration-tests/src/test/resources/queries/broadcast_join_queries.json
deleted file mode 100644
index ee46b415ad9..00000000000
--- a/integration-tests/src/test/resources/queries/broadcast_join_queries.json
+++ /dev/null
@@ -1,29 +0,0 @@
-[
- {
- "description": "query broadcast join segment directly",
- "query": {
- "query": "SELECT \"%%JOIN_DATASOURCE%%\".\"user\",
SUM(\"%%JOIN_DATASOURCE%%\".\"added\") FROM druid.\"%%JOIN_DATASOURCE%%\" GROUP
BY 1 ORDER BY 2",
- "resultFormat": "OBJECT"
- },
- "expectedResults": [
- {"user":"stringer","EXPR$1":2},
- {"user":"nuclear","EXPR$1":114},
- {"user":"masterYi","EXPR$1":246},
- {"user":"speed","EXPR$1":918},
- {"user":"triplets","EXPR$1":1810}
- ]
- },
- {
- "description": "regular datasource is lhs, broadcast join datasource is
rhs",
- "query": {
- "query": "SELECT \"%%JOIN_DATASOURCE%%\".\"language\" as l1,
\"%%REGULAR_DATASOURCE%%\".\"language\" as l2,
SUM(\"%%JOIN_DATASOURCE%%\".\"sum_added\"),
SUM(\"%%REGULAR_DATASOURCE%%\".\"added\") FROM druid.\"%%REGULAR_DATASOURCE%%\"
INNER JOIN druid.\"%%JOIN_DATASOURCE%%\" ON
\"%%REGULAR_DATASOURCE%%\".\"language\" = \"%%JOIN_DATASOURCE%%\".\"language\"
GROUP BY 1, 2 ORDER BY 3 DESC",
- "resultFormat": "OBJECT"
- },
- "expectedResults": [
- {"l1":"en","l2":"en","EXPR$2":1.372562064E9,"EXPR$3":2.191945776E9},
- {"l1":"zh","l2":"zh","EXPR$2":2.0833281E8,"EXPR$3":9.6017292E7},
- {"l1":"ru","l2":"ru","EXPR$2":6.6673872E7,"EXPR$3":2.19902506E8},
- {"l1":"ja","l2":"ja","EXPR$2":249728.0,"EXPR$3":8.3520802E7}
- ]
- }
-]
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]