This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 067ffe6d055 Move query error and retry integration tests into embedded
suite (#18786)
067ffe6d055 is described below
commit 067ffe6d0555804e2dfd9a57ed3d636accc1ff4c
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Dec 3 16:47:53 2025 +0200
Move query error and retry integration tests into embedded suite (#18786)
Description
-----------
This patch continues the effort to move query group tests to the embedded
test suite.
Note
-----
- The queries have been simplified since they just need to verify the error
thrown
- TLS-based tests are not being migrated here.
Key changed classes
---------------------
* `ITQueryErrorTest`
* `QueryErrorTest`
* `ITQueryRetryTestOnMissingSegments`
* `QueryRetryOnMissingSegmentTest`
* `QueryTestBase`
---
.../testing/embedded/query/QueryErrorTest.java | 293 ++++++++++++++++
.../query/QueryRetryOnMissingSegmentsTest.java | 241 +++++++++++++
.../testing/embedded/query/QueryTestBase.java | 153 ++++++++
.../query/ServerManagerForQueryErrorTest.java | 390 +++++++++++++++++++++
.../ServerManagerForQueryErrorTestModule.java | 37 ++
.../query/SqlQueryHttpRequestHeadersTest.java | 134 +------
.../org.apache.druid.initialization.DruidModule | 1 +
.../org.apache.druid.initialization.DruidModule | 2 +-
.../tests/query/ITBroadcastJoinQueryTest.java | 1 -
.../apache/druid/tests/query/ITQueryErrorTest.java | 217 ------------
.../query/ITQueryRetryTestOnMissingSegments.java | 251 -------------
.../native_query_error_from_historicals_test.json | 19 -
.../queries/sql_error_from_historicals_test.json | 9 -
.../resources/queries/sql_plan_failure_query.json | 8 -
.../testing/embedded/EmbeddedDruidCluster.java | 2 +-
15 files changed, 1135 insertions(+), 623 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryErrorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryErrorTest.java
new file mode 100644
index 00000000000..e3896700823
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryErrorTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.error.ExceptionMatcher;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY;
+import static
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY;
+import static
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY;
+import static
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY;
+import static
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY;
+
+/**
+ * This class tests various query failures.
+ * <p>
+ * - SQL planning failures. Both {@link
org.apache.calcite.sql.parser.SqlParseException}
+ * and {@link org.apache.calcite.tools.ValidationException} are tested using
SQLs that must fail.
+ * - Various query errors from historicals. These tests use {@link
ServerManagerForQueryErrorTest} to make
+ * the query to always throw an exception.
+ */
+public class QueryErrorTest extends QueryTestBase
+{
+ // Introduce onAnyRouter(...) and use it; add TLS tests in the follow-up
patches
+ protected String tableName;
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+ indexer.setServerMemory(600_000_000)
+ .addProperty("druid.worker.capacity", "4")
+ .addProperty("druid.processing.numThreads", "2")
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(broker)
+ .addServer(router)
+ .addServer(indexer)
+ .addServer(historical)
+
.addExtension(ServerManagerForQueryErrorTestModule.class);
+ }
+
+ @Override
+ protected void beforeAll()
+ {
+ tableName = EmbeddedClusterApis.createTestDatasourceName();
+ EmbeddedMSQApis msqApi = new EmbeddedMSQApis(cluster, overlord);
+ SqlTaskStatus ingestionStatus = msqApi.submitTaskSql(StringUtils.format(
+ "REPLACE INTO %s\n"
+ + "OVERWRITE ALL\n"
+ + "SELECT CURRENT_TIMESTAMP AS __time, 1 AS d PARTITIONED BY ALL",
+ tableName
+ ));
+
+ cluster.callApi().waitForTaskToSucceed(ingestionStatus.getTaskId(),
overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(tableName, coordinator,
broker);
+ }
+
+ @Test
+ public void testSqlParseException()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.runSql("FROM foo_bar")
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("400 Bad Request")
+ );
+ }
+
+ @Test
+ public void testSqlValidationException()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.runSql("SELECT * FROM foo_bar")
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("400 Bad Request")
+ );
+ }
+
+ @Test
+ public void testQueryTimeout()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> sqlQueryFuture(b, QUERY_TIMEOUT_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("504")
+ );
+
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> nativeQueryFuture(b, QUERY_TIMEOUT_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("504")
+ );
+ }
+
+ @Test
+ public void testQueryCapacityExceeded()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> sqlQueryFuture(b,
QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("429")
+ );
+
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> nativeQueryFuture(b,
QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("429")
+ );
+ }
+
+ @Test
+ public void testQueryUnsupported()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> sqlQueryFuture(b, QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("501")
+ );
+
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> nativeQueryFuture(b, QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("501")
+ );
+ }
+
+ @Test
+ public void testQueryResourceLimitExceeded()
+ {
+ // SQL
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> sqlQueryFuture(b,
RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("400")
+ );
+
+ // Native
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> nativeQueryFuture(b,
RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("400")
+ );
+ }
+
+ @Test
+ public void testQueryFailure()
+ {
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> sqlQueryFuture(b, QUERY_FAILURE_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("500")
+ );
+
+ MatcherAssert.assertThat(
+ Assertions.assertThrows(
+ Exception.class,
+ () -> cluster.callApi().onAnyBroker(
+ b -> nativeQueryFuture(b, QUERY_FAILURE_TEST_CONTEXT_KEY)
+ )
+ ),
+ ExceptionMatcher.of(HttpResponseException.class)
+ .expectMessageContains("500")
+ );
+ }
+
+ private static Map<String, Object> buildTestContext(String key)
+ {
+ final Map<String, Object> context = new HashMap<>();
+ // Disable cache so that each run hits historical.
+ context.put(QueryContexts.USE_CACHE_KEY, false);
+ context.put(key, true);
+ return context;
+ }
+
+ /**
+ * Set up a SQL query future for the test.
+ */
+ private ListenableFuture<String> sqlQueryFuture(BrokerClient b, String
contextKey)
+ {
+ return b.submitSqlQuery(new ClientSqlQuery(
+ StringUtils.format("SELECT * FROM %s LIMIT 1", tableName),
+ null,
+ false,
+ false,
+ false,
+ buildTestContext(contextKey),
+ List.of()
+ ));
+ }
+
+ /**
+ * Set up a native query future for the test.
+ */
+ private ListenableFuture<String> nativeQueryFuture(BrokerClient b, String
contextKey)
+ {
+ return b.submitNativeQuery(new Druids.ScanQueryBuilder()
+ .dataSource(tableName)
+ .eternityInterval()
+ .limit(1)
+ .context(buildTestContext(contextKey))
+ .build()
+ );
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryRetryOnMissingSegmentsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryRetryOnMissingSegmentsTest.java
new file mode 100644
index 00000000000..2e1bdb69465
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryRetryOnMissingSegmentsTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY;
+import static
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY;
+
+/**
+ * This class tests the query retry on missing segments. A segment can be
missing in a historical during a query if
+ * the historical drops the segment after the broker issues the query to the
historical. To mimic this case, this
+ * test spawns a historical modified for testing. This historical announces
all segments assigned, but doesn't serve
+ * all of them always. Instead, it can report missing segments for some
segments.
+ */
+public class QueryRetryOnMissingSegmentsTest extends QueryTestBase
+{
+ /**
+ * This enumeration represents an expectation after finishing running the
test query.
+ */
+ private enum Expectation
+ {
+ /**
+ * Expect that the test for a query succeeds and with correct results.
+ */
+ ALL_SUCCESS,
+ /**
+ * Expect that the test query returns the 200 HTTP response, but will
surely return incorrect result.
+ */
+ INCORRECT_RESULT,
+ /**
+ * Expect that the test query must return the 500 HTTP response.
+ */
+ QUERY_FAILURE
+ }
+
+ private ObjectMapper jsonMapper;
+ private String tableName;
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+ coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
+ indexer.setServerMemory(400_000_000)
+ .addProperty("druid.worker.capacity", "4")
+ .addProperty("druid.processing.numThreads", "2")
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(broker)
+ .addServer(router)
+ .addServer(indexer)
+ .addServer(historical)
+
.addExtension(ServerManagerForQueryErrorTestModule.class);
+ }
+
+ @Override
+ public void beforeAll()
+ {
+ jsonMapper = overlord.bindings().jsonMapper();
+ tableName = EmbeddedClusterApis.createTestDatasourceName();
+
+ final String taskId = IdUtils.getRandomId();
+ final IndexTask task =
MoreResources.Task.BASIC_INDEX.get().dataSource(tableName).withId(taskId);
+ cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(tableName, coordinator,
broker);
+ }
+
+ @Test
+ public void testWithRetriesDisabledPartialResultDisallowed()
+ {
+ // Since retry is disabled and a partial result is not allowed, the query
must fail.
+ test(buildQuery(0, false, -1), Expectation.QUERY_FAILURE);
+ }
+
+ @Test
+ public void testWithRetriesDisabledPartialResultAllowed()
+ {
+ // Since retry is disabled but a partial result is allowed, the query must
succeed.
+ // However, the query must return an incorrect result.
+ test(buildQuery(0, true, -1), Expectation.INCORRECT_RESULT);
+ }
+
+ @Test
+ public void testWithRetriesEnabledPartialResultDisallowed()
+ {
+ // Since retry is enabled, the query must succeed even though a partial
result is disallowed.
+ // The retry count is set to 1 since on the first retry of the query (i.e.
second overall try), the historical
+ // will start processing the segment and not call it missing.
+ // The query must return correct results.
+ test(buildQuery(1, false, -1), Expectation.ALL_SUCCESS);
+ }
+
+ @Test
+ public void testFailureWhenLastSegmentIsMissingWithPartialResultsDisallowed()
+ {
+ // Since retry is disabled and a partial result is not allowed, the query
must fail since the last segment
+ // is missing/unavailable.
+ test(buildQuery(0, false, 2), Expectation.QUERY_FAILURE);
+ }
+
+ private void test(ClientSqlQuery clientSqlQuery, Expectation expectation)
+ {
+ int querySuccess = 0;
+ int queryFailure = 0;
+ int resultMatches = 0;
+ int resultMismatches = 0;
+
+ String resultAsJson;
+ try {
+ resultAsJson = cluster.callApi().onAnyBroker(b ->
b.submitSqlQuery(clientSqlQuery));
+ querySuccess++;
+ }
+ catch (Exception e) {
+ queryFailure++;
+ resultAsJson = e.getMessage();
+ }
+
+ if (querySuccess > 0) {
+ List<Map<String, Object>> result = JacksonUtils.readValue(
+ jsonMapper,
+ resultAsJson.getBytes(StandardCharsets.UTF_8),
+ new TypeReference<>()
+ {
+ }
+ );
+
+ if (expectation == Expectation.ALL_SUCCESS) {
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertEquals(10, result.get(0).get("cnt"));
+ resultMatches++;
+ } else if (expectation == Expectation.INCORRECT_RESULT) {
+ // When the result is expected to be incorrect, we just check that the
count is not the expected value.
+ Assertions.assertEquals(1, result.size());
+ Assertions.assertNotEquals(10, result.get(0).get("cnt"));
+ resultMismatches++;
+ }
+ }
+
+ switch (expectation) {
+ case ALL_SUCCESS:
+ Assertions.assertEquals(1, querySuccess);
+ Assertions.assertEquals(0, queryFailure);
+ Assertions.assertEquals(1, resultMatches);
+ Assertions.assertEquals(0, resultMismatches);
+ break;
+ case INCORRECT_RESULT:
+ Assertions.assertEquals(1, querySuccess);
+ Assertions.assertEquals(0, queryFailure);
+ Assertions.assertEquals(0, resultMatches);
+ Assertions.assertEquals(1, resultMismatches);
+ break;
+ case QUERY_FAILURE:
+ Assertions.assertEquals(0, querySuccess);
+ Assertions.assertEquals(1, queryFailure);
+ Assertions.assertEquals(0, resultMatches);
+ Assertions.assertEquals(0, resultMismatches);
+ break;
+ default:
+ throw new ISE("Unknown expectation[%s]", expectation);
+ }
+ }
+
+ private ClientSqlQuery buildQuery(
+ int numRetriesOnMissingSegments,
+ boolean allowPartialResults,
+ int unavailableSegmentIdx
+ )
+ {
+ return new ClientSqlQuery(
+ StringUtils.format("SELECT count(item) as cnt FROM %s", tableName),
+ null,
+ false,
+ false,
+ false,
+ buildContext(
+ numRetriesOnMissingSegments,
+ allowPartialResults,
+ unavailableSegmentIdx
+ ),
+ List.of()
+ );
+ }
+
+ private static Map<String, Object> buildContext(
+ int numRetriesOnMissingSegments,
+ boolean allowPartialResults,
+ int unavailableSegmentIdx
+ )
+ {
+ final Map<String, Object> context = new HashMap<>();
+ // Disable cache so that each run hits historical.
+ context.put(QueryContexts.USE_CACHE_KEY, false);
+ context.put(QueryContexts.USE_RESULT_LEVEL_CACHE_KEY, false);
+ context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY,
numRetriesOnMissingSegments);
+ context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
+ context.put(QUERY_RETRY_TEST_CONTEXT_KEY, true);
+ context.put(QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY,
unavailableSegmentIdx);
+ return context;
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryTestBase.java
new file mode 100644
index 00000000000..9469dba543c
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryTestBase.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public abstract class QueryTestBase extends EmbeddedClusterTestBase
+{
+ protected static final String SQL_QUERY_ROUTE = "%s/druid/v2/sql/";
+ public static List<Boolean> SHOULD_USE_BROKER_TO_QUERY = List.of(true,
false);
+
+ protected final EmbeddedBroker broker = new EmbeddedBroker();
+ protected final EmbeddedRouter router = new EmbeddedRouter();
+ protected final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ protected final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ protected final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ protected final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+ protected HttpClient httpClientRef;
+ protected String brokerEndpoint;
+ protected String routerEndpoint;
+
+ /**
+ * Hook for the additional setup that needs to be done before all tests.
+ */
+ protected void beforeAll()
+ {
+ // No-op dy default
+ }
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+ coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
+ indexer.setServerMemory(500_000_000)
+ .addProperty("druid.worker.capacity", "4")
+ .addProperty("druid.processing.numThreads", "2")
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(broker)
+ .addServer(router)
+ .addServer(indexer)
+ .addServer(historical);
+ }
+
+ @BeforeAll
+ void setUp()
+ {
+ httpClientRef = router.bindings().globalHttpClient();
+ brokerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(broker));
+ routerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(router));
+ try {
+ beforeAll();
+ }
+ catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ /**
+ * Execute a SQL query against the given endpoint via the HTTP client.
+ */
+ protected void executeQuery(
+ String endpoint,
+ String contentType,
+ String query,
+ Consumer<Request> onRequest,
+ BiConsumer<Integer, String> onResponse
+ )
+ {
+ URL url;
+ try {
+ url = new URL(endpoint);
+ }
+ catch (MalformedURLException e) {
+ throw new AssertionError("Malformed URL");
+ }
+
+ Request request = new Request(HttpMethod.POST, url);
+ if (contentType != null) {
+ request.addHeader("Content-Type", contentType);
+ }
+
+ if (query != null) {
+ request.setContent(query.getBytes(StandardCharsets.UTF_8));
+ }
+
+ if (onRequest != null) {
+ onRequest.accept(request);
+ }
+
+ StatusResponseHolder response;
+ try {
+ response = httpClientRef.go(request, StatusResponseHandler.getInstance())
+ .get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new AssertionError("Failed to execute a request", e);
+ }
+
+ Assertions.assertNotNull(response);
+
+ onResponse.accept(
+ response.getStatus().getCode(),
+ response.getContent().trim()
+ );
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTest.java
new file mode 100644
index 00000000000..de0c62d52a9
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.guava.Accumulator;
+import org.apache.druid.java.util.common.guava.FunctionalIterable;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DataSegmentAndDescriptor;
+import org.apache.druid.query.LeafSegmentsBundle;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryUnsupportedException;
+import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.planning.ExecutionVertex;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.ServerManager;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.PartitionChunk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This server manager is designed to test various query failures.
+ * <ul>
+ * <li> Missing segments. A segment can be missing during a query if a
historical drops the segment
+ * after the broker issues the query to the historical. To mimic this
situation, the historical
+ * with this server manager announces all segments assigned, and reports
missing segments based on the following:
+ * <ul>
+ * <li> If {@link #QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY} and {@link
#QUERY_RETRY_TEST_CONTEXT_KEY} are set,
+ * the segment at that index is reported as missing exactly
once.</li>
+ * <li> If {@link #QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY} is not set or
is -1, it simulates missing segments
+ * starting from the beginning, up to {@link
#MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS}.</li>
+ * </ul>
+ * The missing report is only generated once for the first time. Post that
report, upon retry, all segments are served
+ * for the datasource. See ITQueryRetryTestOnMissingSegments. </li>
+ * <li> Other query errors. This server manager returns a sequence that always
throws an exception
+ * based on a given query context value. See ITQueryErrorTest. </li>
+ * </ul>
+ *
+ * @see org.apache.druid.query.RetryQueryRunner for query retrying.
+ * @see org.apache.druid.client.JsonParserIterator for handling query errors
from historicals.
+ */
+public class ServerManagerForQueryErrorTest extends ServerManager
+{
+ // Query context key that indicates this query is for query retry testing.
+ public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test";
+ public static final String QUERY_TIMEOUT_TEST_CONTEXT_KEY =
"query-timeout-test";
+ public static final String QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY =
"query-capacity-exceeded-test";
+ public static final String QUERY_UNSUPPORTED_TEST_CONTEXT_KEY =
"query-unsupported-test";
+ public static final String RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY =
"resource-limit-exceeded-test";
+ public static final String QUERY_FAILURE_TEST_CONTEXT_KEY =
"query-failure-test";
+ /**
+ * Query context that indicates which segment should be marked as
unavilable/missing.
+ * This should be used in conjunction with {@link
#QUERY_RETRY_TEST_CONTEXT_KEY}.
+ * <p>
+ * A value of {@code 0} means the first segment will be reported as missing,
{@code 1} for the second, and so on.
+ * If this key is not set (default = -1), the test will instead simulate
missing up to
+ * {@link #MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS} segments from the
beginning.
+ * </p>
+ */
+ public static final String QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY =
"unavailable-segment-idx";
+ private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 1;
+
+ private static final Logger LOG = new
Logger(ServerManagerForQueryErrorTest.class);
+
+ private final ConcurrentHashMap<String, Integer> queryToIgnoredSegments =
new ConcurrentHashMap<>();
+
+ @Inject
+ public ServerManagerForQueryErrorTest(
+ QueryRunnerFactoryConglomerate conglomerate,
+ ServiceEmitter emitter,
+ QueryProcessingPool queryProcessingPool,
+ CachePopulator cachePopulator,
+ @Smile ObjectMapper objectMapper,
+ Cache cache,
+ CacheConfig cacheConfig,
+ SegmentManager segmentManager,
+ ServerConfig serverConfig
+ )
+ {
+ super(
+ conglomerate,
+ emitter,
+ queryProcessingPool,
+ cachePopulator,
+ objectMapper,
+ cache,
+ cacheConfig,
+ segmentManager,
+ serverConfig,
+ NoopPolicyEnforcer.instance()
+ );
+ }
+
+ @Override
+ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query,
Iterable<SegmentDescriptor> specs)
+ {
+ final ExecutionVertex ev = ExecutionVertex.of(query);
+ final Optional<VersionedIntervalTimeline<String, DataSegment>>
maybeTimeline =
+ segmentManager.getTimeline(ev.getBaseTableDataSource());
+ if (maybeTimeline.isEmpty()) {
+ return (queryPlus, responseContext) -> {
+ responseContext.addMissingSegments(Lists.newArrayList(specs));
+ return Sequences.empty();
+ };
+ }
+
+ final QueryRunnerFactory<T, Query<T>> factory =
getQueryRunnerFactory(query);
+ final QueryToolChest<T, Query<T>> toolChest = getQueryToolChest(query,
factory);
+ final VersionedIntervalTimeline<String, DataSegment> timeline =
maybeTimeline.get();
+
+ return new ResourceManagingQueryRunner<>(timeline, factory, toolChest, ev,
specs)
+ {
+ @Override
+ protected LeafSegmentsBundle getLeafSegmentsBundle(Query<T> query,
SegmentMapFunction segmentMapFunction)
+ {
+ // override this method so that we can artifically create missing
segments
+ final List<DataSegmentAndDescriptor> segments = new ArrayList<>();
+ final ArrayList<SegmentDescriptor> missingSegments = new ArrayList<>();
+ final ArrayList<SegmentReference> segmentReferences = new
ArrayList<>();
+ final ArrayList<DataSegmentAndDescriptor> loadableSegments = new
ArrayList<>();
+
+ final QueryContext queryContext = query.context();
+
+ for (SegmentDescriptor descriptor : specs) {
+ final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
+ if (queryContext.getBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
+ final int unavailableSegmentIdx =
queryContext.getInt(QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY, -1);
+ queryToIgnoredSegments.compute(
+ query.getMostSpecificId(),
+ (queryId, ignoreCounter) -> {
+ if (ignoreCounter == null) {
+ ignoreCounter = 0;
+ }
+
+ if (unavailableSegmentIdx >= 0 && unavailableSegmentIdx ==
ignoreCounter) {
+ // Fail exactly once when counter matches the configured
retry index
+ ignoreCounter++;
+ isIgnoreSegment.setTrue();
+ } else if (ignoreCounter <
MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
+ // Fail up to N times for this query
+ ignoreCounter++;
+ isIgnoreSegment.setTrue();
+ }
+ return ignoreCounter;
+ }
+ );
+
+ if (isIgnoreSegment.isTrue()) {
+ LOG.info(
+ "Pretending I don't have segment[%s]",
+ descriptor
+ );
+ missingSegments.add(descriptor);
+ continue;
+ }
+ }
+
+ final PartitionChunk<DataSegment> chunk = timeline.findChunk(
+ descriptor.getInterval(),
+ descriptor.getVersion(),
+ descriptor.getPartitionNumber()
+ );
+
+ if (chunk != null) {
+ segments.add(new DataSegmentAndDescriptor(chunk.getObject(),
descriptor));
+ } else {
+ missingSegments.add(descriptor);
+ }
+ }
+
+ // inlined logic of ServerManager.getSegmentsBundle
+ for (DataSegmentAndDescriptor segment : segments) {
+ final DataSegment dataSegment = segment.getDataSegment();
+ if (dataSegment == null) {
+ missingSegments.add(segment.getDescriptor());
+ continue;
+ }
+ Optional<Segment> ref =
segmentManager.acquireCachedSegment(dataSegment);
+ if (ref.isPresent()) {
+ segmentReferences.add(
+ new SegmentReference(
+ segment.getDescriptor(),
+ segmentMapFunction.apply(ref),
+ null
+ )
+ );
+ } else if (segmentManager.canLoadSegmentOnDemand(dataSegment)) {
+ loadableSegments.add(segment);
+ } else {
+ missingSegments.add(segment.getDescriptor());
+ }
+ }
+ return new LeafSegmentsBundle(segmentReferences, loadableSegments,
missingSegments);
+ }
+ };
+ }
+
+ @Override
+ protected <T> FunctionalIterable<QueryRunner<T>> getQueryRunnersForSegments(
+ Query<T> query,
+ QueryRunnerFactory<T, Query<T>> factory,
+ QueryToolChest<T, Query<T>> toolChest,
+ List<SegmentReference> segmentReferences,
+ AtomicLong cpuTimeAccumulator,
+ Optional<byte[]> cacheKeyPrefix
+ )
+ {
+ return FunctionalIterable
+ .create(segmentReferences)
+ .transform(
+ ref ->
+ ref.getSegmentReference()
+ .map(segment -> {
+ final QueryContext queryContext = query.context();
+ if
(queryContext.getBoolean(QUERY_TIMEOUT_TEST_CONTEXT_KEY, false)) {
+ return (QueryRunner<T>) (queryPlus,
responseContext) -> new Sequence<>()
+ {
+ @Override
+ public <OutType> OutType accumulate(
+ OutType initValue,
+ Accumulator<OutType, T> accumulator
+ )
+ {
+ throw new QueryTimeoutException("query timeout
test");
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(
+ OutType initValue,
+ YieldingAccumulator<OutType, T> accumulator
+ )
+ {
+ throw new QueryTimeoutException("query timeout
test");
+ }
+ };
+ } else if
(queryContext.getBoolean(QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY, false)) {
+ return (QueryRunner<T>) (queryPlus,
responseContext) -> new Sequence<>()
+ {
+ @Override
+ public <OutType> OutType accumulate(
+ OutType initValue,
+ Accumulator<OutType, T> accumulator
+ )
+ {
+ throw
QueryCapacityExceededException.withErrorMessageAndResolvedHost(
+ "query capacity exceeded test"
+ );
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(
+ OutType initValue,
+ YieldingAccumulator<OutType, T> accumulator
+ )
+ {
+ throw
QueryCapacityExceededException.withErrorMessageAndResolvedHost(
+ "query capacity exceeded test"
+ );
+ }
+ };
+ } else if
(queryContext.getBoolean(QUERY_UNSUPPORTED_TEST_CONTEXT_KEY, false)) {
+ return (QueryRunner<T>) (queryPlus,
responseContext) -> new Sequence<>()
+ {
+ @Override
+ public <OutType> OutType accumulate(
+ OutType initValue,
+ Accumulator<OutType, T> accumulator
+ )
+ {
+ throw new QueryUnsupportedException("query
unsupported test");
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(
+ OutType initValue,
+ YieldingAccumulator<OutType, T> accumulator
+ )
+ {
+ throw new QueryUnsupportedException("query
unsupported test");
+ }
+ };
+ } else if
(queryContext.getBoolean(RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY, false)) {
+ return (QueryRunner<T>) (queryPlus,
responseContext) -> new Sequence<>()
+ {
+ @Override
+ public <OutType> OutType accumulate(
+ OutType initValue,
+ Accumulator<OutType, T> accumulator
+ )
+ {
+ throw new
ResourceLimitExceededException("resource limit exceeded test");
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(
+ OutType initValue,
+ YieldingAccumulator<OutType, T> accumulator
+ )
+ {
+ throw new
ResourceLimitExceededException("resource limit exceeded test");
+ }
+ };
+ } else if
(queryContext.getBoolean(QUERY_FAILURE_TEST_CONTEXT_KEY, false)) {
+ return (QueryRunner<T>) (queryPlus,
responseContext) -> new Sequence<>()
+ {
+ @Override
+ public <OutType> OutType accumulate(
+ OutType initValue,
+ Accumulator<OutType, T> accumulator
+ )
+ {
+ throw new RuntimeException("query failure
test");
+ }
+
+ @Override
+ public <OutType> Yielder<OutType> toYielder(
+ OutType initValue,
+ YieldingAccumulator<OutType, T> accumulator
+ )
+ {
+ throw new RuntimeException("query failure
test");
+ }
+ };
+ }
+
+ return buildQueryRunnerForSegment(
+ ref.getSegmentDescriptor(),
+ segment,
+ factory,
+ toolChest,
+ cpuTimeAccumulator,
+ cacheKeyPrefix
+ );
+ }
+ ).orElseThrow(
+ () -> DruidException.defensive("Unexpected missing
segment[%s]", ref.getSegmentDescriptor())
+ )
+ );
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTestModule.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTestModule.java
new file mode 100644
index 00000000000..8e4b71a3bda
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTestModule.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.google.inject.Binder;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.server.ServerManager;
+
+@LoadScope(roles = NodeRole.HISTORICAL_JSON_NAME)
+public class ServerManagerForQueryErrorTestModule implements DruidModule
+{
+ @Override
+ public void configure(Binder binder)
+ {
+
binder.bind(ServerManager.class).to(ServerManagerForQueryErrorTest.class).in(LazySingleton.class);
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
index 5914001163a..3ce50da68da 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
@@ -19,120 +19,23 @@
package org.apache.druid.testing.embedded.query;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.testing.embedded.EmbeddedBroker;
-import org.apache.druid.testing.embedded.EmbeddedCoordinator;
-import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
-import org.apache.druid.testing.embedded.EmbeddedIndexer;
-import org.apache.druid.testing.embedded.EmbeddedOverlord;
-import org.apache.druid.testing.embedded.EmbeddedRouter;
-import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
-import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.FieldSource;
import javax.ws.rs.core.MediaType;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
/**
* Suite to test various Content-Type headers attached
* to SQL query HTTP requests to brokers and routers.
*/
-public class SqlQueryHttpRequestHeadersTest extends EmbeddedClusterTestBase
+public class SqlQueryHttpRequestHeadersTest extends QueryTestBase
{
- private static final String SQL_QUERY_ROUTE = "%s/druid/v2/sql/";
-
- public static List<Boolean> shouldUseQueryBroker = List.of(true, false);
-
- private final EmbeddedBroker broker = new EmbeddedBroker();
- private final EmbeddedRouter router = new EmbeddedRouter();
-
- private HttpClient httpClientRef;
- private String brokerEndpoint;
- private String routerEndpoint;
-
- @Override
- protected EmbeddedDruidCluster createCluster()
- {
- return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
- .useLatchableEmitter()
- .addServer(new EmbeddedOverlord())
- .addServer(new EmbeddedCoordinator())
- .addServer(broker)
- .addServer(router)
- .addServer(new EmbeddedIndexer());
- }
-
- @BeforeEach
- void setUp()
- {
- httpClientRef = router.bindings().globalHttpClient();
- brokerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(broker));
- routerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(router));
- }
-
- private void executeQuery(
- String endpoint,
- String contentType,
- String query,
- Consumer<Request> onRequest,
- BiConsumer<Integer, String> onResponse
- )
- {
- URL url;
- try {
- url = new URL(endpoint);
- }
- catch (MalformedURLException e) {
- throw new AssertionError("Malformed URL");
- }
-
- Request request = new Request(HttpMethod.POST, url);
- if (contentType != null) {
- request.addHeader("Content-Type", contentType);
- }
-
- if (query != null) {
- request.setContent(query.getBytes(StandardCharsets.UTF_8));
- }
-
- if (onRequest != null) {
- onRequest.accept(request);
- }
-
- StatusResponseHolder response;
- try {
- response = httpClientRef.go(request, StatusResponseHandler.getInstance())
- .get();
- }
- catch (InterruptedException | ExecutionException e) {
- throw new AssertionError("Failed to execute a request", e);
- }
-
- Assertions.assertNotNull(response);
-
- onResponse.accept(
- response.getStatus().getCode(),
- response.getContent().trim()
- );
- }
-
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testNullContentType(boolean shouldQueryBroker)
{
executeQuery(
@@ -141,14 +44,12 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
"select 1",
(request) -> {
},
- (statusCode, responseBody) -> {
- Assertions.assertEquals(statusCode,
HttpResponseStatus.BAD_REQUEST.getCode());
- }
+ (statusCode, responseBody) -> Assertions.assertEquals(statusCode,
HttpResponseStatus.BAD_REQUEST.getCode())
);
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testUnsupportedContentType(boolean shouldQueryBroker)
{
executeQuery(
@@ -157,14 +58,15 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
"select 1",
(request) -> {
},
- (statusCode, responseBody) -> {
- Assertions.assertEquals(statusCode,
HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.getCode());
- }
+ (statusCode, responseBody) -> Assertions.assertEquals(
+ statusCode,
+ HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.getCode()
+ )
);
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testTextPlain(boolean shouldQueryBroker)
{
executeQuery(
@@ -181,7 +83,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testFormURLEncoded(boolean shouldQueryBroker)
{
executeQuery(
@@ -198,7 +100,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testFormURLEncoded_InvalidEncoding(boolean shouldQueryBroker)
{
executeQuery(
@@ -215,7 +117,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testJSON(boolean shouldQueryBroker)
{
executeQuery(
@@ -244,7 +146,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testInvalidJSONFormat(boolean shouldQueryBroker)
{
executeQuery(
@@ -261,7 +163,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testEmptyQuery_TextPlain(boolean shouldQueryBroker)
{
executeQuery(
@@ -278,7 +180,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testEmptyQuery_UrlEncoded(boolean shouldQueryBroker)
{
executeQuery(
@@ -295,7 +197,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testBlankQuery_TextPlain(boolean shouldQueryBroker)
{
executeQuery(
@@ -312,7 +214,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testEmptyQuery_JSON(boolean shouldQueryBroker)
{
executeQuery(
@@ -329,7 +231,7 @@ public class SqlQueryHttpRequestHeadersTest extends
EmbeddedClusterTestBase
}
@ParameterizedTest
- @FieldSource("shouldUseQueryBroker")
+ @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
public void testMultipleContentType_usesFirstOne(boolean shouldQueryBroker)
{
executeQuery(
diff --git
a/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
b/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index c38b51514ea..12fa4663f10 100644
---
a/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++
b/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -17,4 +17,5 @@
# under the License.
#
+org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTestModule
org.apache.druid.testing.embedded.gcs.GoogleStorageTestModule
diff --git
a/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
b/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index 3703390abef..3559697aea5 100644
---
a/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++
b/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.druid.testing.tools.CustomNodeRoleClientModule
+org.apache.druid.testing.tools.CustomNodeRoleClientModule
\ No newline at end of file
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
index c4b7229b2f9..3e5e160c173 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
@@ -50,7 +50,6 @@ public class ITBroadcastJoinQueryTest extends
AbstractIndexerTest
private static final String BROADCAST_JOIN_QUERIES_RESOURCE =
"/queries/broadcast_join_queries.json";
private static final String BROADCAST_JOIN_DATASOURCE =
"broadcast_join_wikipedia_test";
-
@Inject
ServerDiscoveryFactory factory;
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java
deleted file mode 100644
index 7322a32eb5a..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.query;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.query.QueryContexts;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ServerManagerForQueryErrorTest;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.SqlTestQueryHelper;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class tests various query failures.
- *
- * - SQL planning failures. Both {@link
org.apache.calcite.sql.parser.SqlParseException}
- * and {@link org.apache.calcite.tools.ValidationException} are tested using
SQLs that must fail.
- * - Various query errors from historicals. These tests use {@link
ServerManagerForQueryErrorTest} to make
- * the query to always throw an exception. They verify the error code
returned by
- * {@link org.apache.druid.sql.http.SqlResource} and {@link
org.apache.druid.server.QueryResource}.
- */
-@Test(groups = TestNGGroup.QUERY_ERROR)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITQueryErrorTest
-{
- private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
- /**
- * A simple query used for error tests from historicals. What query is does
not matter because the query is always
- * expected to fail.
- *
- * @see ServerManagerForQueryErrorTest#buildQueryRunnerForSegment
- */
- private static final String NATIVE_QUERY_RESOURCE =
- "/queries/native_query_error_from_historicals_test.json";
- private static final String SQL_QUERY_RESOURCE =
- "/queries/sql_error_from_historicals_test.json";
- /**
- * A simple sql query template used for plan failure tests.
- */
- private static final String SQL_PLAN_FAILURE_RESOURCE =
"/queries/sql_plan_failure_query.json";
-
- @Inject
- private DataLoaderHelper dataLoaderHelper;
- @Inject
- private TestQueryHelper queryHelper;
- @Inject
- private SqlTestQueryHelper sqlHelper;
- @Inject
- private ObjectMapper jsonMapper;
-
- @BeforeMethod
- public void before()
- {
- // ensure that wikipedia segments are loaded completely
- dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*400.*")
- public void testSqlParseException() throws Exception
- {
- // test a sql without SELECT
- sqlHelper.testQueriesFromString(buildSqlPlanFailureQuery("FROM t WHERE col
= 'a'"));
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*400.*")
- public void testSqlValidationException() throws Exception
- {
- // test a sql that selects unknown column
- sqlHelper.testQueriesFromString(
- buildSqlPlanFailureQuery(StringUtils.format("SELECT unknown_col FROM
%s LIMIT 1", WIKIPEDIA_DATA_SOURCE))
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*504.*")
- public void testSqlTimeout() throws Exception
- {
- sqlHelper.testQueriesFromString(
-
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*429.*")
- public void testSqlCapacityExceeded() throws Exception
- {
- sqlHelper.testQueriesFromString(
-
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*501.*")
- public void testSqlUnsupported() throws Exception
- {
- sqlHelper.testQueriesFromString(
-
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*400.*")
- public void testSqlResourceLimitExceeded() throws Exception
- {
- sqlHelper.testQueriesFromString(
-
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*500.*")
- public void testSqlFailure() throws Exception
- {
- sqlHelper.testQueriesFromString(
-
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*504.*")
- public void testQueryTimeout() throws Exception
- {
- queryHelper.testQueriesFromString(
-
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*429.*")
- public void testQueryCapacityExceeded() throws Exception
- {
- queryHelper.testQueriesFromString(
-
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*501.*")
- public void testQueryUnsupported() throws Exception
- {
- queryHelper.testQueriesFromString(
-
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*400.*")
- public void testResourceLimitExceeded() throws Exception
- {
- queryHelper.testQueriesFromString(
-
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
- );
- }
-
- @Test(expectedExceptions = {RuntimeException.class},
expectedExceptionsMessageRegExp = "(?s).*500.*")
- public void testQueryFailure() throws Exception
- {
- queryHelper.testQueriesFromString(
-
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY)
- );
- }
-
- private String buildSqlPlanFailureQuery(String sql) throws IOException
- {
- return StringUtils.replace(
- AbstractIndexerTest.getResourceAsString(SQL_PLAN_FAILURE_RESOURCE),
- "%%QUERY%%",
- sql
- );
- }
-
- private String buildHistoricalErrorSqlQuery(String contextKey) throws
IOException
- {
- return StringUtils.replace(
- AbstractIndexerTest.getResourceAsString(SQL_QUERY_RESOURCE),
- "%%CONTEXT%%",
- jsonMapper.writeValueAsString(buildTestContext(contextKey))
- );
- }
-
- private String buildHistoricalErrorTestQuery(String contextKey) throws
IOException
- {
- return StringUtils.replace(
- AbstractIndexerTest.getResourceAsString(NATIVE_QUERY_RESOURCE),
- "%%CONTEXT%%",
- jsonMapper.writeValueAsString(buildTestContext(contextKey))
- );
- }
-
- private static Map<String, Object> buildTestContext(String key)
- {
- final Map<String, Object> context = new HashMap<>();
- // Disable cache so that each run hits historical.
- context.put(QueryContexts.USE_CACHE_KEY, false);
- context.put(key, true);
- return context;
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
deleted file mode 100644
index eb7bcacdb13..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.query;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.query.QueryContexts;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.clients.QueryResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.ServerManagerForQueryErrorTest;
-import org.apache.druid.testing.utils.QueryResultVerifier;
-import org.apache.druid.testing.utils.QueryWithResults;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class tests the query retry on missing segments. A segment can be
missing in a historical during a query if
- * the historical drops the segment after the broker issues the query to the
historical. To mimic this case, this
- * test spawns a historical modified for testing. This historical announces
all segments assigned, but doesn't serve
- * all of them always. Instead, it can report missing segments for some
- * segments. See {@link ServerManagerForQueryErrorTest} for more details.
- * <p>
- * To run this test properly, the test group must be specified as {@link
TestNGGroup#QUERY_RETRY}.
- */
-@Test(groups = TestNGGroup.QUERY_RETRY)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITQueryRetryTestOnMissingSegments
-{
- private static final String TWITTERSTREAM_DATA_SOURCE = "twitterstream";
- private static final String QUERIES_RESOURCE =
"/queries/twitterstream_queries_query_retry_test.json";
-
- /**
- * This enumeration represents an expectation after finishing running the
test query.
- */
- private enum Expectation
- {
- /**
- * Expect that the test query succeed and with correct results.
- */
- ALL_SUCCESS,
- /**
- * Expect that the test query returns the 200 HTTP response, but will
surely return incorrect result.
- */
- INCORRECT_RESULT,
- /**
- * Expect that the test query must return the 500 HTTP response.
- */
- QUERY_FAILURE
- }
-
- @Inject
- private CoordinatorResourceTestClient coordinatorClient;
- @Inject
- private TestQueryHelper queryHelper;
- @Inject
- private QueryResourceTestClient queryClient;
- @Inject
- private IntegrationTestingConfig config;
- @Inject
- private ObjectMapper jsonMapper;
-
- @BeforeMethod
- public void before()
- {
- // ensure that twitterstream segments are loaded completely
- ITRetryUtil.retryUntilTrue(
- () -> coordinatorClient.areSegmentsLoaded(TWITTERSTREAM_DATA_SOURCE),
"twitterstream segments load"
- );
- }
-
- @Test
- public void testWithRetriesDisabledPartialResultDisallowed() throws Exception
- {
- // Since retry is disabled and partial result is not allowed, the query
must fail.
- testQueries(buildQuery(0, false), Expectation.QUERY_FAILURE);
- }
-
- @Test
- public void testWithRetriesDisabledPartialResultAllowed() throws Exception
- {
- // Since retry is disabled but partial result is allowed, the query must
succeed.
- // However, the query must return incorrect result.
- testQueries(buildQuery(0, true), Expectation.INCORRECT_RESULT);
- }
-
- @Test
- public void testWithRetriesEnabledPartialResultDisallowed() throws Exception
- {
- // Since retry is enabled, the query must succeed even though partial
result is disallowed.
- // The retry count is set to 1 since on the first retry of the query (i.e
second overall try), the historical
- // will start processing the segment and not call it missing.
- // The query must return correct results.
- testQueries(buildQuery(1, false), Expectation.ALL_SUCCESS);
- }
-
- @Test
- public void
testFailureWhenLastSegmentIsMissingWithPartialResultsDisallowed() throws
Exception
- {
- // Since retry is disabled and partial result is not allowed, the query
must fail since the last segment
- // is missing/unavailable.
- testQueries(buildQuery(0, false, 2), Expectation.QUERY_FAILURE);
- }
-
- private void testQueries(String queryWithResultsStr, Expectation
expectation) throws Exception
- {
- final List<QueryWithResults> queries = jsonMapper.readValue(
- queryWithResultsStr,
- new TypeReference<>() {}
- );
- testQueries(queries, expectation);
- }
-
- private void testQueries(List<QueryWithResults> queries, Expectation
expectation) throws Exception
- {
- int querySuccess = 0;
- int queryFailure = 0;
- int resultMatches = 0;
- int resultMismatches = 0;
-
- for (QueryWithResults queryWithResult : queries) {
- final StatusResponseHolder responseHolder = queryClient
- .queryAsync(queryHelper.getQueryURL(config.getBrokerUrl()),
queryWithResult.getQuery())
- .get();
-
- if (responseHolder.getStatus().getCode() ==
HttpResponseStatus.OK.getCode()) {
- querySuccess++;
-
- List<Map<String, Object>> result = jsonMapper.readValue(
- responseHolder.getContent(),
- new TypeReference<>() {}
- );
- if (!QueryResultVerifier.compareResults(
- result,
- queryWithResult.getExpectedResults(),
- queryWithResult.getFieldsToTest()
- ).isSuccess()) {
- if (expectation != Expectation.INCORRECT_RESULT) {
- throw new ISE(
- "Incorrect query results for query %s \n expectedResults: %s
\n actualResults : %s",
- queryWithResult.getQuery(),
-
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
- jsonMapper.writeValueAsString(result)
- );
- } else {
- resultMismatches++;
- }
- } else {
- resultMatches++;
- }
- } else if (responseHolder.getStatus().getCode() ==
HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode() &&
- expectation == Expectation.QUERY_FAILURE) {
- final Map<String, Object> response =
jsonMapper.readValue(responseHolder.getContent(), Map.class);
- final String errorMessage = (String) response.get("errorMessage");
- Assert.assertNotNull(errorMessage, "errorMessage");
- Assert.assertTrue(errorMessage.contains("No results found for
segments"));
- queryFailure++;
- } else {
- throw new ISE(
- "Unexpected failure, code: [%s], content: [%s]",
- responseHolder.getStatus(),
- responseHolder.getContent()
- );
- }
- }
-
- switch (expectation) {
- case ALL_SUCCESS:
- Assert.assertEquals(querySuccess, 1);
- Assert.assertEquals(queryFailure, 0);
- Assert.assertEquals(resultMatches, 1);
- Assert.assertEquals(resultMismatches, 0);
- break;
- case QUERY_FAILURE:
- Assert.assertEquals(querySuccess, 0);
- Assert.assertEquals(queryFailure, 1);
- Assert.assertEquals(resultMatches, 0);
- Assert.assertEquals(resultMismatches, 0);
- break;
- case INCORRECT_RESULT:
- Assert.assertEquals(querySuccess, 1);
- Assert.assertEquals(queryFailure, 0);
- Assert.assertEquals(resultMatches, 0);
- Assert.assertEquals(resultMismatches, 1);
- break;
- default:
- throw new ISE("Unknown expectation[%s]", expectation);
- }
- }
-
- private String buildQuery(int numRetriesOnMissingSegments, boolean
allowPartialResults) throws IOException
- {
- return buildQuery(numRetriesOnMissingSegments, allowPartialResults, -1);
- }
-
- private String buildQuery(int numRetriesOnMissingSegments, boolean
allowPartialResults, int unavailableSegmentIdx) throws IOException
- {
- return StringUtils.replace(
- AbstractIndexerTest.getResourceAsString(QUERIES_RESOURCE),
- "%%CONTEXT%%",
-
jsonMapper.writeValueAsString(buildContext(numRetriesOnMissingSegments,
allowPartialResults, unavailableSegmentIdx))
- );
- }
-
- private static Map<String, Object> buildContext(int
numRetriesOnMissingSegments, boolean allowPartialResults, int
unavailableSegmentIdx)
- {
- final Map<String, Object> context = new HashMap<>();
- // Disable cache so that each run hits historical.
- context.put(QueryContexts.USE_CACHE_KEY, false);
- context.put(QueryContexts.USE_RESULT_LEVEL_CACHE_KEY, false);
- context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY,
numRetriesOnMissingSegments);
- context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
- context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY,
true);
-
context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY,
unavailableSegmentIdx);
- return context;
- }
-}
diff --git
a/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json
b/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json
deleted file mode 100644
index 92b02a83be8..00000000000
---
a/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json
+++ /dev/null
@@ -1,19 +0,0 @@
-[
- {
- "description": "timeseries, 1 agg, all",
- "query": {
- "queryType": "timeseries",
- "dataSource": "wikipedia_editstream",
- "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
- "granularity": "all",
- "aggregations": [
- {
- "type": "count",
- "name": "rows"
- }
- ],
- "context": %%CONTEXT%%
- },
- "expectedResults": []
- }
-]
diff --git
a/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json
b/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json
deleted file mode 100644
index d90441f855d..00000000000
---
a/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json
+++ /dev/null
@@ -1,9 +0,0 @@
-[
- {
- "query": {
- "query": "SELECT count(*) from wikipedia_editstream",
- "context": %%CONTEXT%%
- },
- "expectedResults": []
- }
-]
\ No newline at end of file
diff --git
a/integration-tests/src/test/resources/queries/sql_plan_failure_query.json
b/integration-tests/src/test/resources/queries/sql_plan_failure_query.json
deleted file mode 100644
index dd0fbc6f1f4..00000000000
--- a/integration-tests/src/test/resources/queries/sql_plan_failure_query.json
+++ /dev/null
@@ -1,8 +0,0 @@
-[
- {
- "query": {
- "query": "%%QUERY%%"
- },
- "expectedResults": []
- }
-]
\ No newline at end of file
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 66178c05895..182d11905d5 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -149,7 +149,7 @@ public class EmbeddedDruidCluster implements
EmbeddedResource
* Configures this cluster to use the given default timeout with the
* {@link LatchableEmitter}. Slow tests may set a higher value for the
timeout
* to avoid failures. If the cluster does not {@link #useLatchableEmitter()},
- * this method has no effect. Default timeout is 10 seconds.
+ * this method has no effect. The default timeout is 10 seconds.
*/
public EmbeddedDruidCluster useDefaultTimeoutForLatchableEmitter(long
timeoutSeconds)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]