This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 067ffe6d055 Move query error and retry integration tests into embedded 
suite (#18786)
067ffe6d055 is described below

commit 067ffe6d0555804e2dfd9a57ed3d636accc1ff4c
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Wed Dec 3 16:47:53 2025 +0200

    Move query error and retry integration tests into embedded suite (#18786)
    
    Description
    -----------
    This patch continues the effort to move query group tests to the embedded 
test suite.
    
    Note
    -----
    - The queries have been simplified since they just need to verify the error 
thrown
    - TLS-based tests are not being migrated here.
    
    Key changed classes
    ---------------------
     * `ITQueryErrorTest`
     * `QueryErrorTest`
     * `ITQueryRetryTestOnMissingSegments`
     * `QueryRetryOnMissingSegmentTest`
     * `QueryTestBase`
---
 .../testing/embedded/query/QueryErrorTest.java     | 293 ++++++++++++++++
 .../query/QueryRetryOnMissingSegmentsTest.java     | 241 +++++++++++++
 .../testing/embedded/query/QueryTestBase.java      | 153 ++++++++
 .../query/ServerManagerForQueryErrorTest.java      | 390 +++++++++++++++++++++
 .../ServerManagerForQueryErrorTestModule.java      |  37 ++
 .../query/SqlQueryHttpRequestHeadersTest.java      | 134 +------
 .../org.apache.druid.initialization.DruidModule    |   1 +
 .../org.apache.druid.initialization.DruidModule    |   2 +-
 .../tests/query/ITBroadcastJoinQueryTest.java      |   1 -
 .../apache/druid/tests/query/ITQueryErrorTest.java | 217 ------------
 .../query/ITQueryRetryTestOnMissingSegments.java   | 251 -------------
 .../native_query_error_from_historicals_test.json  |  19 -
 .../queries/sql_error_from_historicals_test.json   |   9 -
 .../resources/queries/sql_plan_failure_query.json  |   8 -
 .../testing/embedded/EmbeddedDruidCluster.java     |   2 +-
 15 files changed, 1135 insertions(+), 623 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryErrorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryErrorTest.java
new file mode 100644
index 00000000000..e3896700823
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryErrorTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.error.ExceptionMatcher;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY;
+import static 
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY;
+import static 
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY;
+import static 
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY;
+import static 
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY;
+
+/**
+ * This class tests various query failures.
+ * <p>
+ * - SQL planning failures. Both {@link 
org.apache.calcite.sql.parser.SqlParseException}
+ * and {@link org.apache.calcite.tools.ValidationException} are tested using 
SQLs that must fail.
+ * - Various query errors from historicals. These tests use {@link 
ServerManagerForQueryErrorTest} to make
+ * the query to always throw an exception.
+ */
+public class QueryErrorTest extends QueryTestBase
+{
+  // Introduce onAnyRouter(...) and use it; add TLS tests in the follow-up 
patches
+  protected String tableName;
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+    indexer.setServerMemory(600_000_000)
+           .addProperty("druid.worker.capacity", "4")
+           .addProperty("druid.processing.numThreads", "2")
+           .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(overlord)
+                               .addServer(coordinator)
+                               .addServer(broker)
+                               .addServer(router)
+                               .addServer(indexer)
+                               .addServer(historical)
+                               
.addExtension(ServerManagerForQueryErrorTestModule.class);
+  }
+
+  @Override
+  protected void beforeAll()
+  {
+    tableName = EmbeddedClusterApis.createTestDatasourceName();
+    EmbeddedMSQApis msqApi = new EmbeddedMSQApis(cluster, overlord);
+    SqlTaskStatus ingestionStatus = msqApi.submitTaskSql(StringUtils.format(
+        "REPLACE INTO %s\n"
+        + "OVERWRITE ALL\n"
+        + "SELECT CURRENT_TIMESTAMP AS __time, 1 AS d PARTITIONED BY ALL",
+        tableName
+    ));
+
+    cluster.callApi().waitForTaskToSucceed(ingestionStatus.getTaskId(), 
overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(tableName, coordinator, 
broker);
+  }
+
+  @Test
+  public void testSqlParseException()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.runSql("FROM foo_bar")
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("400 Bad Request")
+    );
+  }
+
+  @Test
+  public void testSqlValidationException()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.runSql("SELECT * FROM foo_bar")
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("400 Bad Request")
+    );
+  }
+
+  @Test
+  public void testQueryTimeout()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> sqlQueryFuture(b, QUERY_TIMEOUT_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("504")
+    );
+
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> nativeQueryFuture(b, QUERY_TIMEOUT_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("504")
+    );
+  }
+
+  @Test
+  public void testQueryCapacityExceeded()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> sqlQueryFuture(b, 
QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("429")
+    );
+
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> nativeQueryFuture(b, 
QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("429")
+    );
+  }
+
+  @Test
+  public void testQueryUnsupported()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> sqlQueryFuture(b, QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("501")
+    );
+
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> nativeQueryFuture(b, QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("501")
+    );
+  }
+
+  @Test
+  public void testQueryResourceLimitExceeded()
+  {
+    // SQL
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> sqlQueryFuture(b, 
RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("400")
+    );
+
+    // Native
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> nativeQueryFuture(b, 
RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("400")
+    );
+  }
+
+  @Test
+  public void testQueryFailure()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> sqlQueryFuture(b, QUERY_FAILURE_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("500")
+    );
+
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            Exception.class,
+            () -> cluster.callApi().onAnyBroker(
+                b -> nativeQueryFuture(b, QUERY_FAILURE_TEST_CONTEXT_KEY)
+            )
+        ),
+        ExceptionMatcher.of(HttpResponseException.class)
+                        .expectMessageContains("500")
+    );
+  }
+
+  private static Map<String, Object> buildTestContext(String key)
+  {
+    final Map<String, Object> context = new HashMap<>();
+    // Disable cache so that each run hits historical.
+    context.put(QueryContexts.USE_CACHE_KEY, false);
+    context.put(key, true);
+    return context;
+  }
+
+  /**
+   * Set up a SQL query future for the test.
+   */
+  private ListenableFuture<String> sqlQueryFuture(BrokerClient b, String 
contextKey)
+  {
+    return b.submitSqlQuery(new ClientSqlQuery(
+        StringUtils.format("SELECT * FROM %s LIMIT 1", tableName),
+        null,
+        false,
+        false,
+        false,
+        buildTestContext(contextKey),
+        List.of()
+    ));
+  }
+
+  /**
+   * Set up a native query future for the test.
+   */
+  private ListenableFuture<String> nativeQueryFuture(BrokerClient b, String 
contextKey)
+  {
+    return b.submitNativeQuery(new Druids.ScanQueryBuilder()
+                                   .dataSource(tableName)
+                                   .eternityInterval()
+                                   .limit(1)
+                                   .context(buildTestContext(contextKey))
+                                   .build()
+    );
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryRetryOnMissingSegmentsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryRetryOnMissingSegmentsTest.java
new file mode 100644
index 00000000000..2e1bdb69465
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryRetryOnMissingSegmentsTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY;
+import static 
org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTest.QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY;
+
+/**
+ * This class tests the query retry on missing segments. A segment can be 
missing in a historical during a query if
+ * the historical drops the segment after the broker issues the query to the 
historical. To mimic this case, this
+ * test spawns a historical modified for testing. This historical announces 
all segments assigned, but doesn't serve
+ * all of them always. Instead, it can report missing segments for some 
segments.
+ */
+public class QueryRetryOnMissingSegmentsTest extends QueryTestBase
+{
+  /**
+   * This enumeration represents an expectation after finishing running the 
test query.
+   */
+  private enum Expectation
+  {
+    /**
+     * Expect that the test for a query succeeds and with correct results.
+     */
+    ALL_SUCCESS,
+    /**
+     * Expect that the test query returns the 200 HTTP response, but will 
surely return incorrect result.
+     */
+    INCORRECT_RESULT,
+    /**
+     * Expect that the test query must return the 500 HTTP response.
+     */
+    QUERY_FAILURE
+  }
+
+  private ObjectMapper jsonMapper;
+  private String tableName;
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+    coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"always");
+    indexer.setServerMemory(400_000_000)
+           .addProperty("druid.worker.capacity", "4")
+           .addProperty("druid.processing.numThreads", "2")
+           .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(overlord)
+                               .addServer(coordinator)
+                               .addServer(broker)
+                               .addServer(router)
+                               .addServer(indexer)
+                               .addServer(historical)
+                               
.addExtension(ServerManagerForQueryErrorTestModule.class);
+  }
+
+  @Override
+  public void beforeAll()
+  {
+    jsonMapper = overlord.bindings().jsonMapper();
+    tableName = EmbeddedClusterApis.createTestDatasourceName();
+
+    final String taskId = IdUtils.getRandomId();
+    final IndexTask task = 
MoreResources.Task.BASIC_INDEX.get().dataSource(tableName).withId(taskId);
+    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(tableName, coordinator, 
broker);
+  }
+
+  @Test
+  public void testWithRetriesDisabledPartialResultDisallowed()
+  {
+    // Since retry is disabled and a partial result is not allowed, the query 
must fail.
+    test(buildQuery(0, false, -1), Expectation.QUERY_FAILURE);
+  }
+
+  @Test
+  public void testWithRetriesDisabledPartialResultAllowed()
+  {
+    // Since retry is disabled but a partial result is allowed, the query must 
succeed.
+    // However, the query must return an incorrect result.
+    test(buildQuery(0, true, -1), Expectation.INCORRECT_RESULT);
+  }
+
+  @Test
+  public void testWithRetriesEnabledPartialResultDisallowed()
+  {
+    // Since retry is enabled, the query must succeed even though a partial 
result is disallowed.
+    // The retry count is set to 1 since on the first retry of the query (i.e. 
second overall try), the historical
+    // will start processing the segment and not call it missing.
+    // The query must return correct results.
+    test(buildQuery(1, false, -1), Expectation.ALL_SUCCESS);
+  }
+
+  @Test
+  public void testFailureWhenLastSegmentIsMissingWithPartialResultsDisallowed()
+  {
+    // Since retry is disabled and a partial result is not allowed, the query 
must fail since the last segment
+    // is missing/unavailable.
+    test(buildQuery(0, false, 2), Expectation.QUERY_FAILURE);
+  }
+
+  private void test(ClientSqlQuery clientSqlQuery, Expectation expectation)
+  {
+    int querySuccess = 0;
+    int queryFailure = 0;
+    int resultMatches = 0;
+    int resultMismatches = 0;
+
+    String resultAsJson;
+    try {
+      resultAsJson = cluster.callApi().onAnyBroker(b -> 
b.submitSqlQuery(clientSqlQuery));
+      querySuccess++;
+    }
+    catch (Exception e) {
+      queryFailure++;
+      resultAsJson = e.getMessage();
+    }
+
+    if (querySuccess > 0) {
+      List<Map<String, Object>> result = JacksonUtils.readValue(
+          jsonMapper,
+          resultAsJson.getBytes(StandardCharsets.UTF_8),
+          new TypeReference<>()
+          {
+          }
+      );
+
+      if (expectation == Expectation.ALL_SUCCESS) {
+        Assertions.assertEquals(1, result.size());
+        Assertions.assertEquals(10, result.get(0).get("cnt"));
+        resultMatches++;
+      } else if (expectation == Expectation.INCORRECT_RESULT) {
+        // When the result is expected to be incorrect, we just check that the 
count is not the expected value.
+        Assertions.assertEquals(1, result.size());
+        Assertions.assertNotEquals(10, result.get(0).get("cnt"));
+        resultMismatches++;
+      }
+    }
+
+    switch (expectation) {
+      case ALL_SUCCESS:
+        Assertions.assertEquals(1, querySuccess);
+        Assertions.assertEquals(0, queryFailure);
+        Assertions.assertEquals(1, resultMatches);
+        Assertions.assertEquals(0, resultMismatches);
+        break;
+      case INCORRECT_RESULT:
+        Assertions.assertEquals(1, querySuccess);
+        Assertions.assertEquals(0, queryFailure);
+        Assertions.assertEquals(0, resultMatches);
+        Assertions.assertEquals(1, resultMismatches);
+        break;
+      case QUERY_FAILURE:
+        Assertions.assertEquals(0, querySuccess);
+        Assertions.assertEquals(1, queryFailure);
+        Assertions.assertEquals(0, resultMatches);
+        Assertions.assertEquals(0, resultMismatches);
+        break;
+      default:
+        throw new ISE("Unknown expectation[%s]", expectation);
+    }
+  }
+
+  private ClientSqlQuery buildQuery(
+      int numRetriesOnMissingSegments,
+      boolean allowPartialResults,
+      int unavailableSegmentIdx
+  )
+  {
+    return new ClientSqlQuery(
+        StringUtils.format("SELECT count(item) as cnt FROM %s", tableName),
+        null,
+        false,
+        false,
+        false,
+        buildContext(
+            numRetriesOnMissingSegments,
+            allowPartialResults,
+            unavailableSegmentIdx
+        ),
+        List.of()
+    );
+  }
+
+  private static Map<String, Object> buildContext(
+      int numRetriesOnMissingSegments,
+      boolean allowPartialResults,
+      int unavailableSegmentIdx
+  )
+  {
+    final Map<String, Object> context = new HashMap<>();
+    // Disable cache so that each run hits historical.
+    context.put(QueryContexts.USE_CACHE_KEY, false);
+    context.put(QueryContexts.USE_RESULT_LEVEL_CACHE_KEY, false);
+    context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 
numRetriesOnMissingSegments);
+    context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
+    context.put(QUERY_RETRY_TEST_CONTEXT_KEY, true);
+    context.put(QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY, 
unavailableSegmentIdx);
+    return context;
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryTestBase.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryTestBase.java
new file mode 100644
index 00000000000..9469dba543c
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryTestBase.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+public abstract class QueryTestBase extends EmbeddedClusterTestBase
+{
+  protected static final String SQL_QUERY_ROUTE = "%s/druid/v2/sql/";
+  public static List<Boolean> SHOULD_USE_BROKER_TO_QUERY = List.of(true, 
false);
+
+  protected final EmbeddedBroker broker = new EmbeddedBroker();
+  protected final EmbeddedRouter router = new EmbeddedRouter();
+  protected final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  protected final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  protected final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  protected final EmbeddedHistorical historical = new EmbeddedHistorical();
+
+  protected HttpClient httpClientRef;
+  protected String brokerEndpoint;
+  protected String routerEndpoint;
+
+  /**
+   * Hook for the additional setup that needs to be done before all tests.
+   */
+  protected void beforeAll()
+  {
+    // No-op dy default
+  }
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+    coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"always");
+    indexer.setServerMemory(500_000_000)
+           .addProperty("druid.worker.capacity", "4")
+           .addProperty("druid.processing.numThreads", "2")
+           .addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .useLatchableEmitter()
+                               .addServer(overlord)
+                               .addServer(coordinator)
+                               .addServer(broker)
+                               .addServer(router)
+                               .addServer(indexer)
+                               .addServer(historical);
+  }
+
+  @BeforeAll
+  void setUp()
+  {
+    httpClientRef = router.bindings().globalHttpClient();
+    brokerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(broker));
+    routerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(router));
+    try {
+      beforeAll();
+    }
+    catch (Exception e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  /**
+   * Execute a SQL query against the given endpoint via the HTTP client.
+   */
+  protected void executeQuery(
+      String endpoint,
+      String contentType,
+      String query,
+      Consumer<Request> onRequest,
+      BiConsumer<Integer, String> onResponse
+  )
+  {
+    URL url;
+    try {
+      url = new URL(endpoint);
+    }
+    catch (MalformedURLException e) {
+      throw new AssertionError("Malformed URL");
+    }
+
+    Request request = new Request(HttpMethod.POST, url);
+    if (contentType != null) {
+      request.addHeader("Content-Type", contentType);
+    }
+
+    if (query != null) {
+      request.setContent(query.getBytes(StandardCharsets.UTF_8));
+    }
+
+    if (onRequest != null) {
+      onRequest.accept(request);
+    }
+
+    StatusResponseHolder response;
+    try {
+      response = httpClientRef.go(request, StatusResponseHandler.getInstance())
+                              .get();
+    }
+    catch (InterruptedException | ExecutionException e) {
+      throw new AssertionError("Failed to execute a request", e);
+    }
+
+    Assertions.assertNotNull(response);
+
+    onResponse.accept(
+        response.getStatus().getCode(),
+        response.getContent().trim()
+    );
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTest.java
new file mode 100644
index 00000000000..de0c62d52a9
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTest.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.guava.Accumulator;
+import org.apache.druid.java.util.common.guava.FunctionalIterable;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DataSegmentAndDescriptor;
+import org.apache.druid.query.LeafSegmentsBundle;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryUnsupportedException;
+import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.planning.ExecutionVertex;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.ServerManager;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.timeline.partition.PartitionChunk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This server manager is designed to test various query failures.
+ * <ul>
+ *  <li> Missing segments. A segment can be missing during a query if a 
historical drops the segment
+ *   after the broker issues the query to the historical. To mimic this 
situation, the historical
+ *   with this server manager announces all segments assigned, and reports 
missing segments based on the following:
+ *   <ul>
+ *     <li> If {@link #QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY} and {@link 
#QUERY_RETRY_TEST_CONTEXT_KEY} are set,
+ *           the segment at that index is reported as missing exactly 
once.</li>
+ *     <li> If {@link #QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY} is not set or 
is -1, it simulates missing segments
+ *     starting from the beginning, up to {@link 
#MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS}.</li>
+ *   </ul>
+ *   The missing report is only generated once for the first time. Post that 
report, upon retry, all segments are served
+ *   for the datasource. See ITQueryRetryTestOnMissingSegments. </li>
+ * <li> Other query errors. This server manager returns a sequence that always 
throws an exception
+ *   based on a given query context value. See ITQueryErrorTest. </li>
+ * </ul>
+ *
+ * @see org.apache.druid.query.RetryQueryRunner for query retrying.
+ * @see org.apache.druid.client.JsonParserIterator for handling query errors 
from historicals.
+ */
+public class ServerManagerForQueryErrorTest extends ServerManager
+{
+  // Query context key that indicates this query is for query retry testing.
+  public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test";
+  public static final String QUERY_TIMEOUT_TEST_CONTEXT_KEY = 
"query-timeout-test";
+  public static final String QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY = 
"query-capacity-exceeded-test";
+  public static final String QUERY_UNSUPPORTED_TEST_CONTEXT_KEY = 
"query-unsupported-test";
+  public static final String RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY = 
"resource-limit-exceeded-test";
+  public static final String QUERY_FAILURE_TEST_CONTEXT_KEY = 
"query-failure-test";
+  /**
+   * Query context that indicates which segment should be marked as 
unavilable/missing.
+   * This should be used in conjunction with {@link 
#QUERY_RETRY_TEST_CONTEXT_KEY}.
+   * <p>
+   * A value of {@code 0} means the first segment will be reported as missing, 
{@code 1} for the second, and so on.
+   * If this key is not set (default = -1), the test will instead simulate 
missing up to
+   * {@link #MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS} segments from the 
beginning.
+   * </p>
+   */
+  public static final String QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY = 
"unavailable-segment-idx";
+  private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 1;
+
+  private static final Logger LOG = new 
Logger(ServerManagerForQueryErrorTest.class);
+
+  private final ConcurrentHashMap<String, Integer> queryToIgnoredSegments = 
new ConcurrentHashMap<>();
+
+  @Inject
+  public ServerManagerForQueryErrorTest(
+      QueryRunnerFactoryConglomerate conglomerate,
+      ServiceEmitter emitter,
+      QueryProcessingPool queryProcessingPool,
+      CachePopulator cachePopulator,
+      @Smile ObjectMapper objectMapper,
+      Cache cache,
+      CacheConfig cacheConfig,
+      SegmentManager segmentManager,
+      ServerConfig serverConfig
+  )
+  {
+    super(
+        conglomerate,
+        emitter,
+        queryProcessingPool,
+        cachePopulator,
+        objectMapper,
+        cache,
+        cacheConfig,
+        segmentManager,
+        serverConfig,
+        NoopPolicyEnforcer.instance()
+    );
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, 
Iterable<SegmentDescriptor> specs)
+  {
+    final ExecutionVertex ev = ExecutionVertex.of(query);
+    final Optional<VersionedIntervalTimeline<String, DataSegment>> 
maybeTimeline =
+        segmentManager.getTimeline(ev.getBaseTableDataSource());
+    if (maybeTimeline.isEmpty()) {
+      return (queryPlus, responseContext) -> {
+        responseContext.addMissingSegments(Lists.newArrayList(specs));
+        return Sequences.empty();
+      };
+    }
+
+    final QueryRunnerFactory<T, Query<T>> factory = 
getQueryRunnerFactory(query);
+    final QueryToolChest<T, Query<T>> toolChest = getQueryToolChest(query, 
factory);
+    final VersionedIntervalTimeline<String, DataSegment> timeline = 
maybeTimeline.get();
+
+    return new ResourceManagingQueryRunner<>(timeline, factory, toolChest, ev, 
specs)
+    {
+      @Override
+      protected LeafSegmentsBundle getLeafSegmentsBundle(Query<T> query, 
SegmentMapFunction segmentMapFunction)
+      {
+        // override this method so that we can artifically create missing 
segments
+        final List<DataSegmentAndDescriptor> segments = new ArrayList<>();
+        final ArrayList<SegmentDescriptor> missingSegments = new ArrayList<>();
+        final ArrayList<SegmentReference> segmentReferences = new 
ArrayList<>();
+        final ArrayList<DataSegmentAndDescriptor> loadableSegments = new 
ArrayList<>();
+
+        final QueryContext queryContext = query.context();
+
+        for (SegmentDescriptor descriptor : specs) {
+          final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
+          if (queryContext.getBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
+            final int unavailableSegmentIdx = 
queryContext.getInt(QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY, -1);
+            queryToIgnoredSegments.compute(
+                query.getMostSpecificId(),
+                (queryId, ignoreCounter) -> {
+                  if (ignoreCounter == null) {
+                    ignoreCounter = 0;
+                  }
+
+                  if (unavailableSegmentIdx >= 0 && unavailableSegmentIdx == 
ignoreCounter) {
+                    // Fail exactly once when counter matches the configured 
retry index
+                    ignoreCounter++;
+                    isIgnoreSegment.setTrue();
+                  } else if (ignoreCounter < 
MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
+                    // Fail up to N times for this query
+                    ignoreCounter++;
+                    isIgnoreSegment.setTrue();
+                  }
+                  return ignoreCounter;
+                }
+            );
+
+            if (isIgnoreSegment.isTrue()) {
+              LOG.info(
+                  "Pretending I don't have segment[%s]",
+                  descriptor
+              );
+              missingSegments.add(descriptor);
+              continue;
+            }
+          }
+
+          final PartitionChunk<DataSegment> chunk = timeline.findChunk(
+              descriptor.getInterval(),
+              descriptor.getVersion(),
+              descriptor.getPartitionNumber()
+          );
+
+          if (chunk != null) {
+            segments.add(new DataSegmentAndDescriptor(chunk.getObject(), 
descriptor));
+          } else {
+            missingSegments.add(descriptor);
+          }
+        }
+
+        // inlined logic of ServerManager.getSegmentsBundle
+        for (DataSegmentAndDescriptor segment : segments) {
+          final DataSegment dataSegment = segment.getDataSegment();
+          if (dataSegment == null) {
+            missingSegments.add(segment.getDescriptor());
+            continue;
+          }
+          Optional<Segment> ref = 
segmentManager.acquireCachedSegment(dataSegment);
+          if (ref.isPresent()) {
+            segmentReferences.add(
+                new SegmentReference(
+                    segment.getDescriptor(),
+                    segmentMapFunction.apply(ref),
+                    null
+                )
+            );
+          } else if (segmentManager.canLoadSegmentOnDemand(dataSegment)) {
+            loadableSegments.add(segment);
+          } else {
+            missingSegments.add(segment.getDescriptor());
+          }
+        }
+        return new LeafSegmentsBundle(segmentReferences, loadableSegments, 
missingSegments);
+      }
+    };
+  }
+
+  @Override
+  protected <T> FunctionalIterable<QueryRunner<T>> getQueryRunnersForSegments(
+      Query<T> query,
+      QueryRunnerFactory<T, Query<T>> factory,
+      QueryToolChest<T, Query<T>> toolChest,
+      List<SegmentReference> segmentReferences,
+      AtomicLong cpuTimeAccumulator,
+      Optional<byte[]> cacheKeyPrefix
+  )
+  {
+    return FunctionalIterable
+        .create(segmentReferences)
+        .transform(
+            ref ->
+                ref.getSegmentReference()
+                   .map(segment -> {
+                          final QueryContext queryContext = query.context();
+                          if 
(queryContext.getBoolean(QUERY_TIMEOUT_TEST_CONTEXT_KEY, false)) {
+                            return (QueryRunner<T>) (queryPlus, 
responseContext) -> new Sequence<>()
+                            {
+                              @Override
+                              public <OutType> OutType accumulate(
+                                  OutType initValue,
+                                  Accumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new QueryTimeoutException("query timeout 
test");
+                              }
+
+                              @Override
+                              public <OutType> Yielder<OutType> toYielder(
+                                  OutType initValue,
+                                  YieldingAccumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new QueryTimeoutException("query timeout 
test");
+                              }
+                            };
+                          } else if 
(queryContext.getBoolean(QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY, false)) {
+                            return (QueryRunner<T>) (queryPlus, 
responseContext) -> new Sequence<>()
+                            {
+                              @Override
+                              public <OutType> OutType accumulate(
+                                  OutType initValue,
+                                  Accumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw 
QueryCapacityExceededException.withErrorMessageAndResolvedHost(
+                                    "query capacity exceeded test"
+                                );
+                              }
+
+                              @Override
+                              public <OutType> Yielder<OutType> toYielder(
+                                  OutType initValue,
+                                  YieldingAccumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw 
QueryCapacityExceededException.withErrorMessageAndResolvedHost(
+                                    "query capacity exceeded test"
+                                );
+                              }
+                            };
+                          } else if 
(queryContext.getBoolean(QUERY_UNSUPPORTED_TEST_CONTEXT_KEY, false)) {
+                            return (QueryRunner<T>) (queryPlus, 
responseContext) -> new Sequence<>()
+                            {
+                              @Override
+                              public <OutType> OutType accumulate(
+                                  OutType initValue,
+                                  Accumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new QueryUnsupportedException("query 
unsupported test");
+                              }
+
+                              @Override
+                              public <OutType> Yielder<OutType> toYielder(
+                                  OutType initValue,
+                                  YieldingAccumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new QueryUnsupportedException("query 
unsupported test");
+                              }
+                            };
+                          } else if 
(queryContext.getBoolean(RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY, false)) {
+                            return (QueryRunner<T>) (queryPlus, 
responseContext) -> new Sequence<>()
+                            {
+                              @Override
+                              public <OutType> OutType accumulate(
+                                  OutType initValue,
+                                  Accumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new 
ResourceLimitExceededException("resource limit exceeded test");
+                              }
+
+                              @Override
+                              public <OutType> Yielder<OutType> toYielder(
+                                  OutType initValue,
+                                  YieldingAccumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new 
ResourceLimitExceededException("resource limit exceeded test");
+                              }
+                            };
+                          } else if 
(queryContext.getBoolean(QUERY_FAILURE_TEST_CONTEXT_KEY, false)) {
+                            return (QueryRunner<T>) (queryPlus, 
responseContext) -> new Sequence<>()
+                            {
+                              @Override
+                              public <OutType> OutType accumulate(
+                                  OutType initValue,
+                                  Accumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new RuntimeException("query failure 
test");
+                              }
+
+                              @Override
+                              public <OutType> Yielder<OutType> toYielder(
+                                  OutType initValue,
+                                  YieldingAccumulator<OutType, T> accumulator
+                              )
+                              {
+                                throw new RuntimeException("query failure 
test");
+                              }
+                            };
+                          }
+
+                          return buildQueryRunnerForSegment(
+                              ref.getSegmentDescriptor(),
+                              segment,
+                              factory,
+                              toolChest,
+                              cpuTimeAccumulator,
+                              cacheKeyPrefix
+                          );
+                        }
+                   ).orElseThrow(
+                       () -> DruidException.defensive("Unexpected missing 
segment[%s]", ref.getSegmentDescriptor())
+                   )
+        );
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTestModule.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTestModule.java
new file mode 100644
index 00000000000..8e4b71a3bda
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/ServerManagerForQueryErrorTestModule.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.google.inject.Binder;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.annotations.LoadScope;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.server.ServerManager;
+
+@LoadScope(roles = NodeRole.HISTORICAL_JSON_NAME)
+public class ServerManagerForQueryErrorTestModule implements DruidModule
+{
+  @Override
+  public void configure(Binder binder)
+  {
+    
binder.bind(ServerManager.class).to(ServerManagerForQueryErrorTest.class).in(LazySingleton.class);
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
index 5914001163a..3ce50da68da 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/SqlQueryHttpRequestHeadersTest.java
@@ -19,120 +19,23 @@
 
 package org.apache.druid.testing.embedded.query;
 
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.testing.embedded.EmbeddedBroker;
-import org.apache.druid.testing.embedded.EmbeddedCoordinator;
-import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
-import org.apache.druid.testing.embedded.EmbeddedIndexer;
-import org.apache.druid.testing.embedded.EmbeddedOverlord;
-import org.apache.druid.testing.embedded.EmbeddedRouter;
-import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
-import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.FieldSource;
 
 import javax.ws.rs.core.MediaType;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
 
 /**
  * Suite to test various Content-Type headers attached
  * to SQL query HTTP requests to brokers and routers.
  */
-public class SqlQueryHttpRequestHeadersTest extends EmbeddedClusterTestBase
+public class SqlQueryHttpRequestHeadersTest extends QueryTestBase
 {
-  private static final String SQL_QUERY_ROUTE = "%s/druid/v2/sql/";
-
-  public static List<Boolean> shouldUseQueryBroker = List.of(true, false);
-
-  private final EmbeddedBroker broker = new EmbeddedBroker();
-  private final EmbeddedRouter router = new EmbeddedRouter();
-
-  private HttpClient httpClientRef;
-  private String brokerEndpoint;
-  private String routerEndpoint;
-
-  @Override
-  protected EmbeddedDruidCluster createCluster()
-  {
-    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
-                               .useLatchableEmitter()
-                               .addServer(new EmbeddedOverlord())
-                               .addServer(new EmbeddedCoordinator())
-                               .addServer(broker)
-                               .addServer(router)
-                               .addServer(new EmbeddedIndexer());
-  }
-
-  @BeforeEach
-  void setUp()
-  {
-    httpClientRef = router.bindings().globalHttpClient();
-    brokerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(broker));
-    routerEndpoint = StringUtils.format(SQL_QUERY_ROUTE, getServerUrl(router));
-  }
-
-  private void executeQuery(
-      String endpoint,
-      String contentType,
-      String query,
-      Consumer<Request> onRequest,
-      BiConsumer<Integer, String> onResponse
-  )
-  {
-    URL url;
-    try {
-      url = new URL(endpoint);
-    }
-    catch (MalformedURLException e) {
-      throw new AssertionError("Malformed URL");
-    }
-
-    Request request = new Request(HttpMethod.POST, url);
-    if (contentType != null) {
-      request.addHeader("Content-Type", contentType);
-    }
-
-    if (query != null) {
-      request.setContent(query.getBytes(StandardCharsets.UTF_8));
-    }
-
-    if (onRequest != null) {
-      onRequest.accept(request);
-    }
-
-    StatusResponseHolder response;
-    try {
-      response = httpClientRef.go(request, StatusResponseHandler.getInstance())
-                              .get();
-    }
-    catch (InterruptedException | ExecutionException e) {
-      throw new AssertionError("Failed to execute a request", e);
-    }
-
-    Assertions.assertNotNull(response);
-
-    onResponse.accept(
-        response.getStatus().getCode(),
-        response.getContent().trim()
-    );
-  }
-
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testNullContentType(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -141,14 +44,12 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
         "select 1",
         (request) -> {
         },
-        (statusCode, responseBody) -> {
-          Assertions.assertEquals(statusCode, 
HttpResponseStatus.BAD_REQUEST.getCode());
-        }
+        (statusCode, responseBody) -> Assertions.assertEquals(statusCode, 
HttpResponseStatus.BAD_REQUEST.getCode())
     );
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testUnsupportedContentType(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -157,14 +58,15 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
         "select 1",
         (request) -> {
         },
-        (statusCode, responseBody) -> {
-          Assertions.assertEquals(statusCode, 
HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.getCode());
-        }
+        (statusCode, responseBody) -> Assertions.assertEquals(
+            statusCode,
+            HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.getCode()
+        )
     );
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testTextPlain(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -181,7 +83,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testFormURLEncoded(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -198,7 +100,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testFormURLEncoded_InvalidEncoding(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -215,7 +117,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testJSON(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -244,7 +146,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testInvalidJSONFormat(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -261,7 +163,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testEmptyQuery_TextPlain(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -278,7 +180,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testEmptyQuery_UrlEncoded(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -295,7 +197,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testBlankQuery_TextPlain(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -312,7 +214,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testEmptyQuery_JSON(boolean shouldQueryBroker)
   {
     executeQuery(
@@ -329,7 +231,7 @@ public class SqlQueryHttpRequestHeadersTest extends 
EmbeddedClusterTestBase
   }
 
   @ParameterizedTest
-  @FieldSource("shouldUseQueryBroker")
+  @FieldSource("SHOULD_USE_BROKER_TO_QUERY")
   public void testMultipleContentType_usesFirstOne(boolean shouldQueryBroker)
   {
     executeQuery(
diff --git 
a/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
 
b/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index c38b51514ea..12fa4663f10 100644
--- 
a/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ 
b/embedded-tests/src/test/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -17,4 +17,5 @@
 # under the License.
 #
 
+org.apache.druid.testing.embedded.query.ServerManagerForQueryErrorTestModule
 org.apache.druid.testing.embedded.gcs.GoogleStorageTestModule
diff --git 
a/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
 
b/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
index 3703390abef..3559697aea5 100644
--- 
a/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
+++ 
b/integration-tests-ex/tools/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.druid.testing.tools.CustomNodeRoleClientModule
+org.apache.druid.testing.tools.CustomNodeRoleClientModule
\ No newline at end of file
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
index c4b7229b2f9..3e5e160c173 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java
@@ -50,7 +50,6 @@ public class ITBroadcastJoinQueryTest extends 
AbstractIndexerTest
   private static final String BROADCAST_JOIN_QUERIES_RESOURCE = 
"/queries/broadcast_join_queries.json";
   private static final String BROADCAST_JOIN_DATASOURCE = 
"broadcast_join_wikipedia_test";
 
-
   @Inject
   ServerDiscoveryFactory factory;
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java
deleted file mode 100644
index 7322a32eb5a..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.query;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.query.QueryContexts;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ServerManagerForQueryErrorTest;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.SqlTestQueryHelper;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class tests various query failures.
- *
- * - SQL planning failures. Both {@link 
org.apache.calcite.sql.parser.SqlParseException}
- *   and {@link org.apache.calcite.tools.ValidationException} are tested using 
SQLs that must fail.
- * - Various query errors from historicals. These tests use {@link 
ServerManagerForQueryErrorTest} to make
- *   the query to always throw an exception. They verify the error code 
returned by
- *   {@link org.apache.druid.sql.http.SqlResource} and {@link 
org.apache.druid.server.QueryResource}.
- */
-@Test(groups = TestNGGroup.QUERY_ERROR)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITQueryErrorTest
-{
-  private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
-  /**
-   * A simple query used for error tests from historicals. What query is does 
not matter because the query is always
-   * expected to fail.
-   *
-   * @see ServerManagerForQueryErrorTest#buildQueryRunnerForSegment
-   */
-  private static final String NATIVE_QUERY_RESOURCE =
-      "/queries/native_query_error_from_historicals_test.json";
-  private static final String SQL_QUERY_RESOURCE =
-      "/queries/sql_error_from_historicals_test.json";
-  /**
-   * A simple sql query template used for plan failure tests.
-   */
-  private static final String SQL_PLAN_FAILURE_RESOURCE = 
"/queries/sql_plan_failure_query.json";
-
-  @Inject
-  private DataLoaderHelper dataLoaderHelper;
-  @Inject
-  private TestQueryHelper queryHelper;
-  @Inject
-  private SqlTestQueryHelper sqlHelper;
-  @Inject
-  private ObjectMapper jsonMapper;
-
-  @BeforeMethod
-  public void before()
-  {
-    // ensure that wikipedia segments are loaded completely
-    dataLoaderHelper.waitUntilDatasourceIsReady(WIKIPEDIA_DATA_SOURCE);
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*400.*")
-  public void testSqlParseException() throws Exception
-  {
-    // test a sql without SELECT
-    sqlHelper.testQueriesFromString(buildSqlPlanFailureQuery("FROM t WHERE col 
= 'a'"));
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*400.*")
-  public void testSqlValidationException() throws Exception
-  {
-    // test a sql that selects unknown column
-    sqlHelper.testQueriesFromString(
-        buildSqlPlanFailureQuery(StringUtils.format("SELECT unknown_col FROM 
%s LIMIT 1", WIKIPEDIA_DATA_SOURCE))
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*504.*")
-  public void testSqlTimeout() throws Exception
-  {
-    sqlHelper.testQueriesFromString(
-        
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*429.*")
-  public void testSqlCapacityExceeded() throws Exception
-  {
-    sqlHelper.testQueriesFromString(
-        
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*501.*")
-  public void testSqlUnsupported() throws Exception
-  {
-    sqlHelper.testQueriesFromString(
-        
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*400.*")
-  public void testSqlResourceLimitExceeded() throws Exception
-  {
-    sqlHelper.testQueriesFromString(
-        
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*500.*")
-  public void testSqlFailure() throws Exception
-  {
-    sqlHelper.testQueriesFromString(
-        
buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*504.*")
-  public void testQueryTimeout() throws Exception
-  {
-    queryHelper.testQueriesFromString(
-        
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*429.*")
-  public void testQueryCapacityExceeded() throws Exception
-  {
-    queryHelper.testQueriesFromString(
-        
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*501.*")
-  public void testQueryUnsupported() throws Exception
-  {
-    queryHelper.testQueriesFromString(
-        
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*400.*")
-  public void testResourceLimitExceeded() throws Exception
-  {
-    queryHelper.testQueriesFromString(
-        
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY)
-    );
-  }
-
-  @Test(expectedExceptions = {RuntimeException.class}, 
expectedExceptionsMessageRegExp = "(?s).*500.*")
-  public void testQueryFailure() throws Exception
-  {
-    queryHelper.testQueriesFromString(
-        
buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY)
-    );
-  }
-
-  private String buildSqlPlanFailureQuery(String sql) throws IOException
-  {
-    return StringUtils.replace(
-        AbstractIndexerTest.getResourceAsString(SQL_PLAN_FAILURE_RESOURCE),
-        "%%QUERY%%",
-        sql
-    );
-  }
-
-  private String buildHistoricalErrorSqlQuery(String contextKey) throws 
IOException
-  {
-    return StringUtils.replace(
-        AbstractIndexerTest.getResourceAsString(SQL_QUERY_RESOURCE),
-        "%%CONTEXT%%",
-        jsonMapper.writeValueAsString(buildTestContext(contextKey))
-    );
-  }
-
-  private String buildHistoricalErrorTestQuery(String contextKey) throws 
IOException
-  {
-    return StringUtils.replace(
-        AbstractIndexerTest.getResourceAsString(NATIVE_QUERY_RESOURCE),
-        "%%CONTEXT%%",
-        jsonMapper.writeValueAsString(buildTestContext(contextKey))
-    );
-  }
-
-  private static Map<String, Object> buildTestContext(String key)
-  {
-    final Map<String, Object> context = new HashMap<>();
-    // Disable cache so that each run hits historical.
-    context.put(QueryContexts.USE_CACHE_KEY, false);
-    context.put(key, true);
-    return context;
-  }
-}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
deleted file mode 100644
index eb7bcacdb13..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.query;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.query.QueryContexts;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.clients.QueryResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.ServerManagerForQueryErrorTest;
-import org.apache.druid.testing.utils.QueryResultVerifier;
-import org.apache.druid.testing.utils.QueryWithResults;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractIndexerTest;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class tests the query retry on missing segments. A segment can be 
missing in a historical during a query if
- * the historical drops the segment after the broker issues the query to the 
historical. To mimic this case, this
- * test spawns a historical modified for testing. This historical announces 
all segments assigned, but doesn't serve
- * all of them always. Instead, it can report missing segments for some
- * segments. See {@link ServerManagerForQueryErrorTest} for more details.
- * <p>
- * To run this test properly, the test group must be specified as {@link 
TestNGGroup#QUERY_RETRY}.
- */
-@Test(groups = TestNGGroup.QUERY_RETRY)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITQueryRetryTestOnMissingSegments
-{
-  private static final String TWITTERSTREAM_DATA_SOURCE = "twitterstream";
-  private static final String QUERIES_RESOURCE = 
"/queries/twitterstream_queries_query_retry_test.json";
-
-  /**
-   * This enumeration represents an expectation after finishing running the 
test query.
-   */
-  private enum Expectation
-  {
-    /**
-     * Expect that the test query succeed and with correct results.
-     */
-    ALL_SUCCESS,
-    /**
-     * Expect that the test query returns the 200 HTTP response, but will 
surely return incorrect result.
-     */
-    INCORRECT_RESULT,
-    /**
-     * Expect that the test query must return the 500 HTTP response.
-     */
-    QUERY_FAILURE
-  }
-
-  @Inject
-  private CoordinatorResourceTestClient coordinatorClient;
-  @Inject
-  private TestQueryHelper queryHelper;
-  @Inject
-  private QueryResourceTestClient queryClient;
-  @Inject
-  private IntegrationTestingConfig config;
-  @Inject
-  private ObjectMapper jsonMapper;
-
-  @BeforeMethod
-  public void before()
-  {
-    // ensure that twitterstream segments are loaded completely
-    ITRetryUtil.retryUntilTrue(
-        () -> coordinatorClient.areSegmentsLoaded(TWITTERSTREAM_DATA_SOURCE), 
"twitterstream segments load"
-    );
-  }
-
-  @Test
-  public void testWithRetriesDisabledPartialResultDisallowed() throws Exception
-  {
-    // Since retry is disabled and partial result is not allowed, the query 
must fail.
-    testQueries(buildQuery(0, false), Expectation.QUERY_FAILURE);
-  }
-
-  @Test
-  public void testWithRetriesDisabledPartialResultAllowed() throws Exception
-  {
-    // Since retry is disabled but partial result is allowed, the query must 
succeed.
-    // However, the query must return incorrect result.
-    testQueries(buildQuery(0, true), Expectation.INCORRECT_RESULT);
-  }
-
-  @Test
-  public void testWithRetriesEnabledPartialResultDisallowed() throws Exception
-  {
-    // Since retry is enabled, the query must succeed even though partial 
result is disallowed.
-    // The retry count is set to 1 since on the first retry of the query (i.e 
second overall try), the historical
-    // will start processing the segment and not call it missing.
-    // The query must return correct results.
-    testQueries(buildQuery(1, false), Expectation.ALL_SUCCESS);
-  }
-
-  @Test
-  public void 
testFailureWhenLastSegmentIsMissingWithPartialResultsDisallowed() throws 
Exception
-  {
-    // Since retry is disabled and partial result is not allowed, the query 
must fail since the last segment
-    // is missing/unavailable.
-    testQueries(buildQuery(0, false, 2), Expectation.QUERY_FAILURE);
-  }
-
-  private void testQueries(String queryWithResultsStr, Expectation 
expectation) throws Exception
-  {
-    final List<QueryWithResults> queries = jsonMapper.readValue(
-        queryWithResultsStr,
-        new TypeReference<>() {}
-    );
-    testQueries(queries, expectation);
-  }
-
-  private void testQueries(List<QueryWithResults> queries, Expectation 
expectation) throws Exception
-  {
-    int querySuccess = 0;
-    int queryFailure = 0;
-    int resultMatches = 0;
-    int resultMismatches = 0;
-
-    for (QueryWithResults queryWithResult : queries) {
-      final StatusResponseHolder responseHolder = queryClient
-          .queryAsync(queryHelper.getQueryURL(config.getBrokerUrl()), 
queryWithResult.getQuery())
-          .get();
-
-      if (responseHolder.getStatus().getCode() == 
HttpResponseStatus.OK.getCode()) {
-        querySuccess++;
-
-        List<Map<String, Object>> result = jsonMapper.readValue(
-            responseHolder.getContent(),
-            new TypeReference<>() {}
-        );
-        if (!QueryResultVerifier.compareResults(
-            result,
-            queryWithResult.getExpectedResults(),
-            queryWithResult.getFieldsToTest()
-        ).isSuccess()) {
-          if (expectation != Expectation.INCORRECT_RESULT) {
-            throw new ISE(
-                "Incorrect query results for query %s \n expectedResults: %s 
\n actualResults : %s",
-                queryWithResult.getQuery(),
-                
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
-                jsonMapper.writeValueAsString(result)
-            );
-          } else {
-            resultMismatches++;
-          }
-        } else {
-          resultMatches++;
-        }
-      } else if (responseHolder.getStatus().getCode() == 
HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode() &&
-                 expectation == Expectation.QUERY_FAILURE) {
-        final Map<String, Object> response = 
jsonMapper.readValue(responseHolder.getContent(), Map.class);
-        final String errorMessage = (String) response.get("errorMessage");
-        Assert.assertNotNull(errorMessage, "errorMessage");
-        Assert.assertTrue(errorMessage.contains("No results found for 
segments"));
-        queryFailure++;
-      } else {
-        throw new ISE(
-            "Unexpected failure, code: [%s], content: [%s]",
-            responseHolder.getStatus(),
-            responseHolder.getContent()
-        );
-      }
-    }
-
-    switch (expectation) {
-      case ALL_SUCCESS:
-        Assert.assertEquals(querySuccess, 1);
-        Assert.assertEquals(queryFailure, 0);
-        Assert.assertEquals(resultMatches, 1);
-        Assert.assertEquals(resultMismatches, 0);
-        break;
-      case QUERY_FAILURE:
-        Assert.assertEquals(querySuccess, 0);
-        Assert.assertEquals(queryFailure, 1);
-        Assert.assertEquals(resultMatches, 0);
-        Assert.assertEquals(resultMismatches, 0);
-        break;
-      case INCORRECT_RESULT:
-        Assert.assertEquals(querySuccess, 1);
-        Assert.assertEquals(queryFailure, 0);
-        Assert.assertEquals(resultMatches, 0);
-        Assert.assertEquals(resultMismatches, 1);
-        break;
-      default:
-        throw new ISE("Unknown expectation[%s]", expectation);
-    }
-  }
-
-  private String buildQuery(int numRetriesOnMissingSegments, boolean 
allowPartialResults) throws IOException
-  {
-    return buildQuery(numRetriesOnMissingSegments, allowPartialResults, -1);
-  }
-
-  private String buildQuery(int numRetriesOnMissingSegments, boolean 
allowPartialResults, int unavailableSegmentIdx) throws IOException
-  {
-    return StringUtils.replace(
-        AbstractIndexerTest.getResourceAsString(QUERIES_RESOURCE),
-        "%%CONTEXT%%",
-        
jsonMapper.writeValueAsString(buildContext(numRetriesOnMissingSegments, 
allowPartialResults, unavailableSegmentIdx))
-    );
-  }
-
-  private static Map<String, Object> buildContext(int 
numRetriesOnMissingSegments, boolean allowPartialResults, int 
unavailableSegmentIdx)
-  {
-    final Map<String, Object> context = new HashMap<>();
-    // Disable cache so that each run hits historical.
-    context.put(QueryContexts.USE_CACHE_KEY, false);
-    context.put(QueryContexts.USE_RESULT_LEVEL_CACHE_KEY, false);
-    context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 
numRetriesOnMissingSegments);
-    context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
-    context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY, 
true);
-    
context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_UNAVAILABLE_SEGMENT_IDX_KEY,
 unavailableSegmentIdx);
-    return context;
-  }
-}
diff --git 
a/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json
 
b/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json
deleted file mode 100644
index 92b02a83be8..00000000000
--- 
a/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json
+++ /dev/null
@@ -1,19 +0,0 @@
-[
-    {
-        "description": "timeseries, 1 agg, all",
-        "query": {
-            "queryType": "timeseries",
-            "dataSource": "wikipedia_editstream",
-            "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
-            "granularity": "all",
-            "aggregations": [
-                {
-                    "type": "count",
-                    "name": "rows"
-                }
-            ],
-            "context": %%CONTEXT%%
-        },
-        "expectedResults": []
-    }
-]
diff --git 
a/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json
 
b/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json
deleted file mode 100644
index d90441f855d..00000000000
--- 
a/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json
+++ /dev/null
@@ -1,9 +0,0 @@
-[
-  {
-    "query": {
-      "query": "SELECT count(*) from wikipedia_editstream",
-      "context": %%CONTEXT%%
-    },
-    "expectedResults": []
-  }
-]
\ No newline at end of file
diff --git 
a/integration-tests/src/test/resources/queries/sql_plan_failure_query.json 
b/integration-tests/src/test/resources/queries/sql_plan_failure_query.json
deleted file mode 100644
index dd0fbc6f1f4..00000000000
--- a/integration-tests/src/test/resources/queries/sql_plan_failure_query.json
+++ /dev/null
@@ -1,8 +0,0 @@
-[
-  {
-    "query": {
-      "query": "%%QUERY%%"
-    },
-    "expectedResults": []
-  }
-]
\ No newline at end of file
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 66178c05895..182d11905d5 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -149,7 +149,7 @@ public class EmbeddedDruidCluster implements 
EmbeddedResource
    * Configures this cluster to use the given default timeout with the
    * {@link LatchableEmitter}. Slow tests may set a higher value for the 
timeout
    * to avoid failures. If the cluster does not {@link #useLatchableEmitter()},
-   * this method has no effect. Default timeout is 10 seconds.
+   * this method has no effect. The default timeout is 10 seconds.
    */
   public EmbeddedDruidCluster useDefaultTimeoutForLatchableEmitter(long 
timeoutSeconds)
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to