This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1734a9b9d5f Move MSQ ITs to embedded tests (#18465)
1734a9b9d5f is described below

commit 1734a9b9d5f2d4e188e00e43e22c972810e7dd2b
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Sep 3 18:26:33 2025 +0530

    Move MSQ ITs to embedded tests (#18465)
    
    Changes:
    - Remove revised IT categories `BackwardCompatibilityMain`, 
`MultiStageQueryWithMM` and `Query`
    - Remove all code for backward compatibility IT as it is already migrated to
    `IngestionBackwardCompatibilityDockerTest`
    - Move `ITMultiStageQueryWorkerFaultToleranceTest` to 
`MSQWorkerFaultToleranceTest`
    - Move `ITMultiStageQuery` to `MultiStageQueryTest`
    - Move `ITKeyStatisticsSketchMergeMode` to 
`MSQKeyStatisticsSketchMergeModeTest`
    - Make changes to `ClusterTestingModule`, `FaultyOverlordClient` to allow 
fault tolerance testing
---
 .github/workflows/reusable-revised-its.yml         |  46 --
 .github/workflows/revised-its.yml                  |  41 +-
 .../unit-and-integration-tests-unified.yml         |  48 --
 .../testing/embedded/auth/BasicAuthMSQTest.java    |  89 +---
 .../testing/embedded/indexing/MoreResources.java   |  66 +++
 .../testing/embedded/msq/EmbeddedMSQApis.java      |  13 +-
 .../msq/MSQKeyStatisticsSketchMergeModeTest.java   | 189 +++++++
 .../embedded/msq/MSQLocalDurableStorage.java       |  52 ++
 .../embedded/msq/MSQWorkerFaultToleranceTest.java  | 135 +++++
 .../testing/embedded/msq/MultiStageQueryTest.java  | 154 ++++++
 .../testing/embedded/query/UnionQueryTest.java     | 240 +++++++++
 .../apache/druid/guice/ClusterTestingModule.java   |  25 +-
 .../testing/cluster/ClusterTestingTaskConfig.java  |  20 +-
 .../testing/cluster/task/FaultyOverlordClient.java |  87 +++-
 .../druid/guice/ClusterTestingModuleTest.java      |  24 +-
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  14 +-
 .../druid/indexing/overlord/hrtr/WorkerHolder.java |   3 +
 .../indexing/overlord/hrtr/WorkerHolderTest.java   |  15 +-
 integration-tests-ex/cases/pom.xml                 |  32 --
 .../testsEx/categories/MultiStageQueryWithMM.java  |  24 -
 .../org/apache/druid/testsEx/categories/Query.java |  24 -
 .../msq/ITKeyStatisticsSketchMergeMode.java        | 268 ----------
 .../druid/testsEx/msq/ITMultiStageQuery.java       |  30 --
 .../msq/ITMultiStageQueryWorkerFaultTolerance.java | 173 -------
 .../apache/druid/testsEx/msq/MultiStageQuery.java  | 266 ----------
 .../druid/testsEx/query/ITUnionQueryTest.java      |  31 --
 .../apache/druid/testsEx/query/UnionQueryTest.java | 207 --------
 .../cases/src/test/resources/query/union_data.json |  10 -
 .../query/union_kafka_supervisor_template.json     |  69 ---
 .../src/test/resources/query/union_queries.json    | 566 ---------------------
 integration-tests-ex/image/docker-build.sh         |  18 -
 .../apache/druid/msq/indexing/MSQWorkerTask.java   |   1 +
 32 files changed, 1013 insertions(+), 1967 deletions(-)

diff --git a/.github/workflows/reusable-revised-its.yml 
b/.github/workflows/reusable-revised-its.yml
index 568cca04352..12a3e7e11fc 100644
--- a/.github/workflows/reusable-revised-its.yml
+++ b/.github/workflows/reusable-revised-its.yml
@@ -57,19 +57,6 @@ on:
       AWS_SECRET_ACCESS_KEY:
         required: false
         type: string
-      BACKWARD_COMPATIBILITY_IT_ENABLED:
-        required: false
-        type: string
-        default: false
-      DRUID_PREVIOUS_VERSION:
-        required: false
-        type: string
-      DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
-        required: false
-        type: string
-      DRUID_PREVIOUS_IT_IMAGE_NAME:
-        required: false
-        type: string
 
 env:
   MYSQL_DRIVER_CLASSNAME: ${{ inputs.mysql_driver }} # Used by tests to 
connect to metadata store directly.
@@ -119,15 +106,6 @@ jobs:
             ./druid-container-jdk${{ inputs.build_jdk }}.tar.gz
             ./integration-tests-ex/image/target/env.sh
 
-      - name: Retrieve previous version cached docker image
-        id: docker-restore-previous-version
-        if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
-        uses: actions/cache/restore@v4
-        with:
-          key: druid-container-jdk${{ inputs.build_jdk }}-version${{ 
inputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
-          path: |
-            ./druid-container-jdk${{ inputs.build_jdk }}-version${{ 
inputs.DRUID_PREVIOUS_VERSION }}.tar.gz           
-
       - name: Maven build
         if: steps.maven-restore.outputs.cache-hit != 'true' || ( 
steps.docker-restore.outputs.cache-hit != 'true' && 
steps.targets-restore.outputs.cache-hit != 'true' )
         run: |
@@ -137,10 +115,6 @@ jobs:
         if: steps.docker-restore.outputs.cache-hit != 'true' || 
steps.maven-restore.outputs.cache-hit != 'true'
         env:
           docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
-          BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ 
inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
-          DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
-          DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ 
inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
-          DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ 
inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
         run: |
           ./it.sh image
           source ./integration-tests-ex/image/target/env.sh
@@ -148,15 +122,6 @@ jobs:
           echo $DRUID_IT_IMAGE_NAME
           docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{ 
inputs.build_jdk }}.tar.gz
 
-      - name: Save previous version docker image
-        if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' && 
(steps.docker-restore.outputs.cache-hit != 'true' || 
steps.maven-restore.outputs.cache-hit != 'true') }}
-        env:
-          docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
-        run: |
-          docker tag ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{ 
inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ inputs.build_jdk }}-version${{ 
inputs.DRUID_PREVIOUS_VERSION }}
-          echo ${DRUID_PREVIOUS_IT_IMAGE_NAME}
-          docker save "${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip > 
druid-container-jdk${{ inputs.build_jdk }}-version${{ 
inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
-
       - name: Stop and remove docker containers
         run: |
           echo "Force stopping all containers and pruning"
@@ -168,20 +133,9 @@ jobs:
           docker load --input druid-container-jdk${{ inputs.build_jdk }}.tar.gz
           docker images
 
-      - name: Load previous version docker image
-        if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
-        run: |
-          docker load --input druid-container-jdk${{ inputs.build_jdk 
}}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
-          docker images
-
       - name: Run IT
         id: run-it
         timeout-minutes: 60
-        env:
-          BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ 
inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
-          DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
-          DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ 
inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
-          DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ 
inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
         run: |
           ${{ inputs.script }}
 
diff --git a/.github/workflows/revised-its.yml 
b/.github/workflows/revised-its.yml
index 7b71a4a9820..c662cb47d8f 100644
--- a/.github/workflows/revised-its.yml
+++ b/.github/workflows/revised-its.yml
@@ -18,24 +18,6 @@
 name: "Revised ITs workflow"
 on:
   workflow_call:
-    inputs:
-      BACKWARD_COMPATIBILITY_IT_ENABLED:
-        description: "Flag for backward compatibility IT"
-        required: false
-        default: false
-        type: string
-      DRUID_PREVIOUS_VERSION:
-        description: "Previous druid versions to run the test against."
-        required: false
-        type: string
-      DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
-        description: "URL to download the previous druid version."
-        required: false
-        type: string
-      DRUID_PREVIOUS_IT_IMAGE_NAME:
-        description: "Druid previous version image name."
-        required: false
-        type: string
   workflow_dispatch:
 
 jobs:
@@ -67,7 +49,7 @@ jobs:
       fail-fast: false
       matrix:
         jdk: [17]
-        it: [MultiStageQuery, BatchIndex, MultiStageQueryWithMM, InputSource, 
InputFormat, Query, DruidExactCountBitmap]
+        it: [MultiStageQuery, BatchIndex, InputSource, InputFormat, 
DruidExactCountBitmap]
         indexer: [middleManager]
     uses: ./.github/workflows/reusable-revised-its.yml
     if: ${{ needs.changes.outputs.core == 'true' || 
needs.changes.outputs.common-extensions == 'true' }}
@@ -95,24 +77,3 @@ jobs:
       AWS_REGION: us-east-1
       AWS_ACCESS_KEY_ID: admin
       AWS_SECRET_ACCESS_KEY: miniopassword
-
-  backward-compatibility-it:
-    needs: changes
-    uses: ./.github/workflows/reusable-revised-its.yml
-    if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' && 
(needs.changes.outputs.core == 'true' || 
needs.changes.outputs.common-extensions == 'true') }}
-    with:
-      build_jdk: 17
-      runtime_jdk: 17
-      use_indexer: middleManager
-      script: ./it.sh github BackwardCompatibilityMain
-      it: BackwardCompatibilityMain
-      mysql_driver: com.mysql.jdbc.Driver
-      BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ 
inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
-      DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
-      DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ 
inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
-      DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
-      DRUID_CLOUD_BUCKET: druid-qa
-      DRUID_CLOUD_PATH: aws-${{ github.run_id }}-${{ github.run_attempt }}
-      AWS_REGION: us-east-1
-      AWS_ACCESS_KEY_ID: admin
-      AWS_SECRET_ACCESS_KEY: miniopassword
diff --git a/.github/workflows/unit-and-integration-tests-unified.yml 
b/.github/workflows/unit-and-integration-tests-unified.yml
index 95648f0087f..9f073646635 100644
--- a/.github/workflows/unit-and-integration-tests-unified.yml
+++ b/.github/workflows/unit-and-integration-tests-unified.yml
@@ -51,30 +51,7 @@ env:
   SEGMENT_DOWNLOAD_TIMEOUT_MINS: 5
 
 jobs:
-  set-env-var:
-    name: Set env var
-    runs-on: ubuntu-latest
-    outputs:
-      DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ steps.image_name.outputs.image_name }}
-      BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ steps.it_enabled.outputs.enabled 
}}
-      DRUID_PREVIOUS_VERSION: ${{ env.DRUID_PREVIOUS_VERSION }}
-      DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ 
env.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
-    steps:
-      - name: Set image name env var
-        id: image_name
-        run: |
-          echo "::set-output 
name=image_name::org.apache.druid.integration-tests/test:${{ 
env.DRUID_PREVIOUS_VERSION }}"
-      - name: Set env for enabling backward compatibility it
-        id: it_enabled
-        run: |
-          if [ -n "${{ env.DRUID_PREVIOUS_VERSION }}" ]; then
-            echo "::set-output name=enabled::true"
-          else
-            echo "::set-output name=enabled::false"
-          fi
-
   build:
-    needs: set-env-var
     name: "build (jdk${{ matrix.jdk }})"
     strategy:
       fail-fast: false
@@ -120,25 +97,12 @@ jobs:
             ./druid-container-jdk${{ matrix.jdk }}.tar.gz
             ./integration-tests-ex/image/target/env.sh
 
-      - name: Cache previous version image
-        id: docker_container_previous_version
-        uses: actions/cache@v4
-        with:
-          key: druid-container-jdk${{ matrix.jdk }}-version${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
-          path: |
-            ./druid-container-jdk${{ matrix.jdk }}-version${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz
-
       - name: Maven build
         id: maven_build
         run: |
           ./it.sh ci
 
       - name: Container build
-        env:
-          BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ 
needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
-          DRUID_PREVIOUS_VERSION: ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
-          DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
-          DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
         run: |
           ./it.sh image
           source ./integration-tests-ex/image/target/env.sh
@@ -150,13 +114,6 @@ jobs:
           echo $DRUID_IT_IMAGE_NAME
           docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{ 
matrix.jdk }}.tar.gz
 
-      - name: Save previous version docker image
-        if: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 
'true' }}
-        run: |
-          docker tag ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ matrix.jdk 
}}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
-          echo ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
-          docker save "${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip > 
druid-container-jdk${{ matrix.jdk }}-version${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz    
-
   unit-tests:
     name: "unit tests"
     uses: ./.github/workflows/ci.yml
@@ -168,11 +125,6 @@ jobs:
   revised-its:
     needs: [build, unit-tests]
     uses: ./.github/workflows/revised-its.yml
-    with:
-      BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ 
needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
-      DRUID_PREVIOUS_VERSION: ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
-      DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
-      DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ 
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
 
   docker-tests:
     needs: [build, unit-tests]
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
index 4a636c3c588..87012ce2c7b 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
 import org.apache.druid.testing.embedded.EmbeddedIndexer;
 import org.apache.druid.testing.embedded.EmbeddedOverlord;
 import org.apache.druid.testing.embedded.EmbeddedServiceClient;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
 import org.apache.druid.testing.embedded.indexing.Resources;
 import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
 import org.apache.druid.testing.embedded.msq.MSQExportDirectory;
@@ -123,45 +124,11 @@ public class BasicAuthMSQTest extends 
EmbeddedClusterTestBase
     List<ResourceAction> permissions = ImmutableList.of();
     securityClient.setPermissionsToRole(ROLE_1, permissions);
 
-    String queryLocal =
-        StringUtils.format(
-            "INSERT INTO %s\n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n"
-            + "PARTITIONED BY DAY\n",
-            dataSource,
-            Resources.DataFile.tinyWiki1Json().getAbsolutePath()
-        );
+    String queryLocal = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+    );
 
     verifySqlSubmitFailsWith403Forbidden(queryLocal);
   }
@@ -177,45 +144,11 @@ public class BasicAuthMSQTest extends 
EmbeddedClusterTestBase
     );
     securityClient.setPermissionsToRole(ROLE_1, permissions);
 
-    String queryLocal =
-        StringUtils.format(
-            "INSERT INTO %s\n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n"
-            + "PARTITIONED BY DAY\n",
-            dataSource,
-            Resources.DataFile.tinyWiki1Json().getAbsolutePath()
-        );
+    String queryLocal = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json()
+    );
 
     final SqlTaskStatus taskStatus = userClient.onAnyBroker(
         b -> b.submitSqlTask(
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
index b50d4f2e601..205271bcfcf 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
@@ -86,6 +86,72 @@ public class MoreResources
             .appendToExisting(false);
   }
 
+  public static class MSQ
+  {
+    /**
+     * SQL to INSERT any of the tiny wiki JSON data files into a new datasource
+     * with DAY granularity. e.g. {@link Resources.DataFile#tinyWiki1Json()}.
+     */
+    public static final String INSERT_TINY_WIKI_JSON =
+        "INSERT INTO %s\n"
+        + "SELECT\n"
+        + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+        + "  isRobot,\n"
+        + "  diffUrl,\n"
+        + "  added,\n"
+        + "  countryIsoCode,\n"
+        + "  regionName,\n"
+        + "  channel,\n"
+        + "  flags,\n"
+        + "  delta,\n"
+        + "  isUnpatrolled,\n"
+        + "  isNew,\n"
+        + "  deltaBucket,\n"
+        + "  isMinor,\n"
+        + "  isAnonymous,\n"
+        + "  deleted,\n"
+        + "  cityName,\n"
+        + "  metroCode,\n"
+        + "  namespace,\n"
+        + "  comment,\n"
+        + "  page,\n"
+        + "  commentLength,\n"
+        + "  countryName,\n"
+        + "  user,\n"
+        + "  regionIsoCode\n"
+        + "FROM TABLE(\n"
+        + "  EXTERN(\n"
+        + "    '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
+        + "    '{\"type\":\"json\"}',\n"
+        + "    "
+        + "'[{\"type\":\"string\",\"name\":\"timestamp\"},"
+        + "{\"type\":\"string\",\"name\":\"isRobot\"},"
+        + "{\"type\":\"string\",\"name\":\"diffUrl\"},"
+        + "{\"type\":\"long\",\"name\":\"added\"},"
+        + "{\"type\":\"string\",\"name\":\"countryIsoCode\"},"
+        + "{\"type\":\"string\",\"name\":\"regionName\"},"
+        + "{\"type\":\"string\",\"name\":\"channel\"},"
+        + "{\"type\":\"string\",\"name\":\"flags\"},"
+        + "{\"type\":\"long\",\"name\":\"delta\"},"
+        + "{\"type\":\"string\",\"name\":\"isUnpatrolled\"},"
+        + "{\"type\":\"string\",\"name\":\"isNew\"},"
+        + "{\"type\":\"double\",\"name\":\"deltaBucket\"},"
+        + "{\"type\":\"string\",\"name\":\"isMinor\"},"
+        + "{\"type\":\"string\",\"name\":\"isAnonymous\"},"
+        + "{\"type\":\"long\",\"name\":\"deleted\"},"
+        + "{\"type\":\"string\",\"name\":\"cityName\"},"
+        + "{\"type\":\"long\",\"name\":\"metroCode\"},"
+        + 
"{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},"
+        + "{\"type\":\"string\",\"name\":\"page\"},"
+        + "{\"type\":\"long\",\"name\":\"commentLength\"},"
+        + "{\"type\":\"string\",\"name\":\"countryName\"},"
+        + "{\"type\":\"string\",\"name\":\"user\"},"
+        + "{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+        + "  )\n"
+        + ")\n"
+        + "PARTITIONED BY DAY\n";
+  }
+
   /**
    * Supervisor spec builder.
    */
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
index 71967443d40..d656618db3d 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
@@ -83,6 +83,17 @@ public class EmbeddedMSQApis
    * @return The result of the SQL as a single CSV string.
    */
   public SqlTaskStatus submitTaskSql(String sql, Object... args)
+  {
+    return submitTaskSql(null, sql, args);
+  }
+
+  /**
+   * Submits the given SQL query to any of the brokers (using {@code 
BrokerClient})
+   * of the cluster, checks that the task has started and returns the {@link 
SqlTaskStatus}.
+   *
+   * @return The result of the SQL as a single CSV string.
+   */
+  public SqlTaskStatus submitTaskSql(Map<String, Object> queryContext, String 
sql, Object... args)
   {
     final SqlTaskStatus taskStatus =
         cluster.callApi().onAnyBroker(
@@ -93,7 +104,7 @@ public class EmbeddedMSQApis
                     false,
                     false,
                     false,
-                    null,
+                    queryContext,
                     null
                 )
             )
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQKeyStatisticsSketchMergeModeTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQKeyStatisticsSketchMergeModeTest.java
new file mode 100644
index 00000000000..c1d52f9dc7b
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQKeyStatisticsSketchMergeModeTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class MSQKeyStatisticsSketchMergeModeTest extends 
EmbeddedClusterTestBase
+{
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+      .setServerMemory(300_000_000L)
+      .addProperty("druid.worker.capacity", "3");
+  private EmbeddedMSQApis msqApis;
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .addServer(overlord)
+        .addServer(coordinator)
+        .addServer(indexer)
+        .addServer(new EmbeddedBroker())
+        .addServer(new EmbeddedHistorical())
+        .addServer(new EmbeddedRouter());
+  }
+
+  @BeforeAll
+  public void initTestClient()
+  {
+    msqApis = new EmbeddedMSQApis(cluster, overlord);
+  }
+
+  @Test
+  public void testMsqIngestionParallelMerging()
+  {
+    String queryLocal = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json()
+    );
+
+    Map<String, Object> context = Map.of(
+        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+        ClusterStatisticsMergeMode.PARALLEL
+    );
+
+    final SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, 
queryLocal);
+    cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), 
overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+    cluster.callApi().verifySqlQuery(
+        "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+        dataSource,
+        "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+        + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+        + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+    );
+  }
+
+  @Test
+  public void testMsqIngestionSequentialMerging()
+  {
+    String queryLocal = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json()
+    );
+
+    Map<String, Object> context = Map.of(
+        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+        ClusterStatisticsMergeMode.SEQUENTIAL
+    );
+
+    SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, queryLocal);
+    cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), 
overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+    cluster.callApi().verifySqlQuery(
+        "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+        dataSource,
+        "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+        + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+        + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+    );
+  }
+
+
+  @Test
+  public void testMsqIngestionSequentialMergingWithEmptyStatistics()
+  {
+    String queryLocal =
+        StringUtils.format(
+            "Replace INTO %s overwrite ALL \n"
+            + "SELECT\n"
+            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
+            + "  isRobot,\n"
+            + "  diffUrl,\n"
+            + "  added,\n"
+            + "  countryIsoCode,\n"
+            + "  regionName,\n"
+            + "  channel,\n"
+            + "  flags,\n"
+            + "  delta,\n"
+            + "  isUnpatrolled,\n"
+            + "  isNew,\n"
+            + "  deltaBucket,\n"
+            + "  isMinor,\n"
+            + "  isAnonymous,\n"
+            + "  deleted,\n"
+            + "  cityName,\n"
+            + "  metroCode,\n"
+            + "  namespace,\n"
+            + "  comment,\n"
+            + "  page,\n"
+            + "  commentLength,\n"
+            + "  countryName,\n"
+            + "  user,\n"
+            + "  regionIsoCode\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    '{\"type\":\"local\",\"files\":[\"%s\",\"%s\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
+            + "  )\n"
+            + ")\n"
+            + "where delta=111 "
+            // we add this filter since delta=111 is only present in 
wikipedia_index_data1.json. This means partitions from worker 2 will be empty.
+            + "PARTITIONED BY DAY\n"
+            + "CLUSTERED BY \"__time\"",
+            dataSource,
+            Resources.DataFile.tinyWiki1Json(),
+            Resources.DataFile.tinyWiki2Json()
+        );
+
+    Map<String, Object> context = Map.of(
+        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+        ClusterStatisticsMergeMode.SEQUENTIAL,
+        MultiStageQueryContext.CTX_MAX_NUM_TASKS,
+        3
+    );
+
+    SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, queryLocal);
+    cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(), 
overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+    cluster.callApi().verifySqlQuery(
+        "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+        dataSource,
+        "2013-08-31T07:11:21.000Z,,123,111,12,article"
+    );
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQLocalDurableStorage.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQLocalDurableStorage.java
new file mode 100644
index 00000000000..7d1fa02d60d
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQLocalDurableStorage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedResource;
+
+import java.io.File;
+
+/**
+ * Embedded resource to use durable local storage for MSQ tasks.
+ */
+public class MSQLocalDurableStorage implements EmbeddedResource
+{
+  @Override
+  public void start() throws Exception
+  {
+    // do nothing
+  }
+
+  @Override
+  public void onStarted(EmbeddedDruidCluster cluster)
+  {
+    final File baseDir = 
cluster.getTestFolder().getOrCreateFolder("msq-durable-storage");
+    cluster.addCommonProperty("druid.msq.intermediate.storage.enable", "true")
+           .addCommonProperty("druid.msq.intermediate.storage.type", "local")
+           .addCommonProperty("druid.msq.intermediate.storage.basePath", 
baseDir.getAbsolutePath());
+  }
+
+  @Override
+  public void stop() throws Exception
+  {
+    // do nothing
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
new file mode 100644
index 00000000000..4d1299adc88
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.guice.ClusterTestingModule;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+/**
+ * Test to verify that cancelled worker tasks are retried when fault tolerance
+ * is enabled. This test uses the {@link ClusterTestingModule} to create a
+ * faulty Indexer which blocks the completion of the worker task. This allows
+ * time to kill off the worker before it can finish, thus triggering a 
relaunch.
+ */
+public class MSQWorkerFaultToleranceTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+      .addProperty("druid.plaintextPort", "7091")
+      .addProperty("druid.worker.capacity", "1");
+
+  private EmbeddedMSQApis msqApis;
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .addExtension(ClusterTestingModule.class)
+        .addResource(new MSQLocalDurableStorage())
+        .addServer(overlord)
+        .addServer(coordinator)
+        .addServer(indexer)
+        .addServer(new EmbeddedBroker())
+        .addServer(new EmbeddedHistorical());
+  }
+
+  @BeforeAll
+  public void initTestClient()
+  {
+    msqApis = new EmbeddedMSQApis(cluster, overlord);
+  }
+
+  @Test
+  public void test_cancelledWorker_isRetried_ifFaultToleranceIsEnabled() 
throws Exception
+  {
+    final String queryLocal = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+    );
+
+    // Run the MSQ task in fault tolerance mode
+    final SqlTaskStatus taskStatus = msqApis.submitTaskSql(
+        Map.of("faultTolerance", true),
+        queryLocal
+    );
+
+    // Add a faulty Indexer to the cluster so that worker is launched but 
doesn't finish
+    final EmbeddedIndexer faultyIndexer = new EmbeddedIndexer()
+        .addProperty("druid.unsafe.cluster.testing", "true")
+        
.addProperty("druid.unsafe.cluster.testing.overlordClient.taskStatusDelay", 
"PT1H")
+        .addProperty("druid.worker.capacity", "1");
+    cluster.addServer(faultyIndexer);
+    faultyIndexer.start();
+
+    // Let the worker run for a bit so that controller task moves to 
READING_INPUT phase
+    final ServiceMetricEvent matchingEvent = 
faultyIndexer.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("ingest/count")
+    );
+    final String workerTaskId = (String) 
matchingEvent.getUserDims().get(DruidMetrics.TASK_ID);
+    Thread.sleep(100);
+
+    // Cancel the worker task and verify that it has failed
+    cluster.callApi().onLeaderOverlord(o -> o.cancelTask(workerTaskId));
+    overlord.latchableEmitter().waitForEvent(
+        event -> event.hasMetricName("task/run/time")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+                      .hasDimension(DruidMetrics.TASK_STATUS, "FAILED")
+    );
+    faultyIndexer.stop();
+
+    // Add a functional Indexer so that the worker is relaunched successfully
+    final EmbeddedIndexer functionalIndexer = new EmbeddedIndexer()
+        .addProperty("druid.worker.capacity", "1");
+    cluster.addServer(functionalIndexer);
+    functionalIndexer.start();
+
+    // Verify that the controller task eventually succeeds
+    cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), 
overlord.latchableEmitter());
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+    cluster.callApi().verifySqlQuery(
+        "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+        dataSource,
+        "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+        + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+        + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+    );
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
new file mode 100644
index 00000000000..e12e3518798
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultiStageQueryTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+      .setServerMemory(300_000_000L)
+      .addProperty("druid.worker.capacity", "2");
+  private final MSQExportDirectory exportDirectory = new MSQExportDirectory();
+
+  private EmbeddedMSQApis msqApis;
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .addResource(exportDirectory)
+        .addServer(overlord)
+        .addServer(coordinator)
+        .addServer(indexer)
+        .addServer(new EmbeddedBroker())
+        .addServer(new EmbeddedHistorical());
+  }
+
+  @BeforeAll
+  public void initTestClient()
+  {
+    msqApis = new EmbeddedMSQApis(cluster, overlord);
+  }
+
+  @Test
+  public void testMsqIngestionAndQuerying()
+  {
+    final String sql = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+    );
+
+    final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
+    cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), 
overlord.latchableEmitter());
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+    cluster.callApi().verifySqlQuery(
+        "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+        dataSource,
+        "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+        + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+        + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+    );
+  }
+
+  @Test
+  public void testExport()
+  {
+    final String exportSql =
+        StringUtils.format(
+            "INSERT INTO extern(local(exportPath => '%s'))\n"
+            + "AS CSV\n"
+            + "SELECT page, added, delta\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
+            + "    '{\"type\":\"json\"}',\n"
+            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
+            + "  )\n"
+            + ")\n",
+            new File(exportDirectory.get(), dataSource).getAbsolutePath(),
+            Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+        );
+
+    final SqlTaskStatus taskStatus = msqApis.submitTaskSql(exportSql);
+    cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), 
overlord.latchableEmitter());
+
+    final String selectSql = StringUtils.format(
+        "SELECT page, delta, added\n"
+        + "  FROM TABLE(\n"
+        + "    EXTERN(\n"
+        + "      
'{\"type\":\"local\",\"baseDir\":\"%s\",\"filter\":\"*.csv\"}',\n"
+        + "      '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
+        + "    )\n"
+        + "  ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
+        + "   WHERE delta != 0\n"
+        + "   ORDER BY page",
+        exportDirectory.get()
+    );
+
+    final MSQTaskReportPayload statusReport = 
msqApis.runTaskSqlAndGetReport(selectSql);
+    Assertions.assertNotNull(statusReport);
+    Assertions.assertNotNull(statusReport.getResults());
+
+    MSQResultsReport resultsReport = statusReport.getResults();
+
+    List<List<Object>> actualResults = new ArrayList<>();
+    for (final Object[] row : resultsReport.getResults()) {
+      actualResults.add(Arrays.asList(row));
+    }
+
+    List<List<Object>> expectedResults = List.of(
+        List.of("Cherno Alpha", 111, 123),
+        List.of("Gypsy Danger", -143, 57),
+        List.of("Striker Eureka", 330, 459)
+    );
+
+    Assertions.assertEquals(
+        expectedResults,
+        actualResults
+    );
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/UnionQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/UnionQueryTest.java
new file mode 100644
index 00000000000..05a47e8f2d6
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/UnionQueryTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
+import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
+import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
+import org.apache.druid.query.search.InsensitiveContainsSearchQuerySpec;
+import org.apache.druid.query.topn.LexicographicTopNMetricSpec;
+import org.apache.druid.query.topn.TopNQueryBuilder;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class UnionQueryTest extends EmbeddedClusterTestBase
+{
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .addExtensions(SketchModule.class, HllSketchModule.class, 
DoublesSketchModule.class)
+        .addServer(overlord)
+        .addServer(coordinator)
+        .addServer(new EmbeddedIndexer())
+        .addServer(new EmbeddedBroker())
+        .addServer(new EmbeddedHistorical());
+  }
+
+  @Test
+  public void test_ingestData_andVerifyNativeAndSQLQueries()
+  {
+    final int numDatasources = 3;
+
+    final List<String> datasourceNames = IntStream
+        .range(0, numDatasources)
+        .mapToObj(i -> EmbeddedClusterApis.createTestDatasourceName())
+        .collect(Collectors.toList());
+
+    for (String datasourceName : datasourceNames) {
+      final Task task = MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS
+          .get()
+          .dataSource(datasourceName)
+          .withId(IdUtils.getRandomId());
+      cluster.callApi().runTask(task, overlord);
+      cluster.callApi().waitForAllSegmentsToBeAvailable(datasourceName, 
coordinator);
+    }
+
+    // Verify some native queries
+    final DataSource unionDatasource = new UnionDataSource(
+        datasourceNames.stream()
+                       .map(TableDataSource::new)
+                       .collect(Collectors.toList())
+    );
+
+    // Timeseries query
+    verifyQuery(
+        Druids
+            .newTimeseriesQueryBuilder()
+            .intervals("2013-08-31/2013-09-01")
+            .dataSource(unionDatasource)
+            .filters("language", "en")
+            .aggregators(
+                new CountAggregatorFactory("rows"),
+                new LongSumAggregatorFactory("count", "ingested_events"),
+                new DoubleSumAggregatorFactory("added", "added"),
+                new DoubleSumAggregatorFactory("deleted", "deleted"),
+                new DoubleSumAggregatorFactory("delta", "delta")
+            )
+            .build(),
+        List.of(
+            Map.of(
+                "timestamp", "2013-08-31T01:02:33.000Z",
+                "result", Map.of("count", 6, "delta", 561.0, "deleted", 987.0, 
"rows", 6, "added", 1548.0)
+            )
+        )
+    );
+
+    // Search query
+    verifyQuery(
+        Druids
+            .newSearchQueryBuilder()
+            .dataSource(unionDatasource)
+            .intervals("2013-08-31/2013-09-01")
+            .granularity(Granularities.ALL)
+            .query(new InsensitiveContainsSearchQuerySpec("ip"))
+            .build(),
+        List.of(
+            Map.of(
+                "timestamp", "2013-08-31T00:00:00.000Z",
+                "result", List.of(
+                    Map.of("dimension", "user", "value", "triplets", "count", 
3),
+                    Map.of("dimension", "namespace", "value", "wikipedia", 
"count", 9)
+                )
+            )
+        )
+    );
+
+    // TopN lexicographic
+    verifyQuery(
+        new TopNQueryBuilder()
+            .dataSource(unionDatasource)
+            .intervals("2013-08-31/2013-09-01")
+            .granularity(Granularities.ALL)
+            .aggregators(
+                new CountAggregatorFactory("rows"),
+                new LongSumAggregatorFactory("count", "ingested_events")
+            )
+            .postAggregators(
+                new ArithmeticPostAggregator(
+                    "sumOfRowsAndCount",
+                    "+",
+                    List.of(new FieldAccessPostAggregator(null, "rows"), new 
FieldAccessPostAggregator(null, "count"))
+                )
+            )
+            .dimension("language")
+            .metric(new LexicographicTopNMetricSpec("a"))
+            .threshold(3)
+            .build(),
+        List.of(
+            Map.of(
+                "timestamp", "2013-08-31T01:02:33.000Z",
+                "result",
+                List.of(
+                    Map.of("sumOfRowsAndCount", 12.0, "count", 6, "language", 
"en", "rows", 6),
+                    Map.of("sumOfRowsAndCount", 6.0, "count", 3, "language", 
"ja", "rows", 3),
+                    Map.of("sumOfRowsAndCount", 6.0, "count", 3, "language", 
"ru", "rows", 3)
+                )
+            )
+        )
+    );
+
+    // Verify some SQL queries
+    cluster.callApi().verifySqlQuery(
+        "SELECT page, COUNT(*), SUM(ingested_events), SUM(added), 
SUM(deleted), SUM(delta) FROM (%s)"
+        + " WHERE __time >= '2013-08-31' AND __time < '2013-09-01'"
+        + " GROUP BY 1 ORDER BY 4 DESC LIMIT 3",
+        unionAll("SELECT * FROM %s", datasourceNames),
+        "Crimson Typhoon,3,3,2715.0,15.0,2700.0\n"
+        + "Striker Eureka,3,3,1377.0,387.0,990.0\n"
+        + "Cherno Alpha,3,3,369.0,36.0,333.0"
+    );
+    cluster.callApi().verifySqlQuery(
+        "SELECT MIN(__time), MAX(__time) FROM (%s)",
+        unionAll("SELECT * FROM %s", datasourceNames),
+        "2013-08-31T01:02:33.000Z,2013-09-01T12:41:27.000Z"
+    );
+    cluster.callApi().verifySqlQuery(
+        "SELECT COUNT(*), SUM(ingested_events), SUM(added), SUM(deleted), 
SUM(delta) FROM (%s)"
+        + " WHERE \"language\" = 'en' AND __time >= '2013-08-31' AND __time < 
'2013-09-01'",
+        unionAll("SELECT * FROM %s", datasourceNames),
+        "6,6,1548.0,987.0,561.0"
+    );
+    cluster.callApi().verifySqlQuery(
+        unionAll(
+            "SELECT COUNT(*), SUM(ingested_events), SUM(added), SUM(deleted), 
SUM(delta)"
+            + " FROM %s"
+            + " WHERE \"language\" = 'en' AND __time >= '2013-08-31' AND 
__time < '2013-09-01'",
+            datasourceNames
+        ),
+        null,
+        "2,2,516.0,329.0,187.0\n"
+        + "2,2,516.0,329.0,187.0\n"
+        + "2,2,516.0,329.0,187.0"
+    );
+  }
+
+  /**
+   * Creates a SQL for each of the datasources and then combines them with 
{@code UNION ALL}.
+   */
+  private String unionAll(String sqlFormat, List<String> datasourceNames)
+  {
+    return datasourceNames.stream()
+                          .map(ds -> StringUtils.format(sqlFormat, ds))
+                          .collect(Collectors.joining("\nUNION ALL\n"));
+  }
+
+  private void verifyQuery(Query<?> query, List<Map<String, Object>> 
expectedResult)
+  {
+    final String resultAsJson = cluster.callApi().onAnyBroker(b -> 
b.submitNativeQuery(query));
+    final List<Map<String, Object>> resultList = JacksonUtils.readValue(
+        TestHelper.JSON_MAPPER,
+        resultAsJson.getBytes(StandardCharsets.UTF_8),
+        new TypeReference<>() {}
+    );
+    Assertions.assertEquals(expectedResult, resultList);
+  }
+}
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
index 4c32caf1f6b..4c48d1ae760 100644
--- 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
@@ -54,6 +54,8 @@ import java.util.Set;
 public class ClusterTestingModule implements DruidModule
 {
   private static final Logger log = new Logger(ClusterTestingModule.class);
+  private static final String PROPERTY_ENABLE = "druid.unsafe.cluster.testing";
+  private static final String PROPERTY_OVERLORD_CLIENT_CONFIG = 
"druid.unsafe.cluster.testing.overlordClient";
 
   private Set<NodeRole> roles;
   private boolean isClusterTestingEnabled = false;
@@ -65,7 +67,7 @@ public class ClusterTestingModule implements DruidModule
   )
   {
     this.isClusterTestingEnabled = Boolean.parseBoolean(
-        props.getProperty("druid.unsafe.cluster.testing", "false")
+        props.getProperty(PROPERTY_ENABLE, "false")
     );
     this.roles = roles;
   }
@@ -75,13 +77,14 @@ public class ClusterTestingModule implements DruidModule
   {
     if (isClusterTestingEnabled) {
       log.warn(
-          "Running service in cluster testing mode. This is an unsafe 
test-only"
+          "Running service with roles[%s] in cluster testing mode. This is an 
unsafe test-only"
           + " mode and must never be used in a production cluster."
-          + " Set property[druid.unsafe.cluster.testing=false] to disable 
testing mode."
+          + " Set property[%s=false] to disable testing mode.",
+          roles, PROPERTY_ENABLE
       );
       bindDependenciesForClusterTestingMode(binder);
     } else {
-      log.warn("Cluster testing is disabled. Set 
property[druid.unsafe.cluster.testing=true] to enable it.");
+      log.info("Cluster testing is disabled. Set property[%s=true] to enable 
it.", PROPERTY_ENABLE);
     }
   }
 
@@ -97,18 +100,22 @@ public class ClusterTestingModule implements DruidModule
       binder.bind(CoordinatorClient.class)
             .to(FaultyCoordinatorClient.class)
             .in(LazySingleton.class);
-      binder.bind(OverlordClient.class)
-            .to(FaultyOverlordClient.class)
-            .in(LazySingleton.class);
       binder.bind(RemoteTaskActionClientFactory.class)
             .to(FaultyRemoteTaskActionClientFactory.class)
             .in(LazySingleton.class);
     } else if (roles.contains(NodeRole.OVERLORD)) {
-      // If this is the Overlord, bind a faulty storage coordinator
-      log.warn("Running Overlord in cluster testing mode.");
       binder.bind(GlobalTaskLockbox.class)
             .to(FaultyTaskLockbox.class)
             .in(LazySingleton.class);
+    } else if (roles.contains(NodeRole.INDEXER)) {
+      JsonConfigProvider.bind(
+          binder,
+          PROPERTY_OVERLORD_CLIENT_CONFIG,
+          ClusterTestingTaskConfig.OverlordClientConfig.class
+      );
+      binder.bind(OverlordClient.class)
+            .to(FaultyOverlordClient.class)
+            .in(LazySingleton.class);
     }
   }
 
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
index c350c951dab..6a08a7bdc52 100644
--- 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
@@ -112,12 +112,28 @@ public class ClusterTestingTaskConfig
    */
   public static class OverlordClientConfig
   {
-    private static final OverlordClientConfig DEFAULT = new 
OverlordClientConfig();
+    private static final OverlordClientConfig DEFAULT = new 
OverlordClientConfig(null);
+
+    @JsonProperty
+    private final Duration taskStatusDelay;
+
+    @JsonCreator
+    public OverlordClientConfig(
+        @JsonProperty("taskStatusDelay") @Nullable Duration taskStatusDelay
+    )
+    {
+      this.taskStatusDelay = taskStatusDelay;
+    }
+
+    public Duration getTaskStatusDelay()
+    {
+      return taskStatusDelay;
+    }
 
     @Override
     public String toString()
     {
-      return "";
+      return '{' + "taskStatusDelay=" + taskStatusDelay + '}';
     }
   }
 
diff --git 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
index ba96840b833..77a2f26573b 100644
--- 
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
+++ 
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
@@ -20,40 +20,91 @@
 package org.apache.druid.testing.cluster.task;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
-import org.apache.druid.client.coordinator.Coordinator;
-import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.rpc.ServiceClientFactory;
-import org.apache.druid.rpc.ServiceLocator;
-import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceRetryPolicy;
 import org.apache.druid.rpc.indexing.OverlordClientImpl;
 import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Set;
 
 public class FaultyOverlordClient extends OverlordClientImpl
 {
   private static final Logger log = new Logger(FaultyOverlordClient.class);
 
-  private final ClusterTestingTaskConfig testingConfig;
+  private final ObjectMapper jsonMapper;
+  private final ServiceClient serviceClient;
+  private final ClusterTestingTaskConfig.OverlordClientConfig testingConfig;
 
   @Inject
   public FaultyOverlordClient(
-      ClusterTestingTaskConfig testingConfig,
+      ClusterTestingTaskConfig.OverlordClientConfig testingConfig,
       @Json final ObjectMapper jsonMapper,
-      @EscalatedGlobal final ServiceClientFactory clientFactory,
-      @Coordinator final ServiceLocator serviceLocator
+      @IndexingService final ServiceClient serviceClient
   )
   {
-    super(
-        clientFactory.makeClient(
-            NodeRole.COORDINATOR.getJsonName(),
-            serviceLocator,
-            StandardRetryPolicy.builder().maxAttempts(6).build()
-        ),
-        jsonMapper
-    );
+    super(serviceClient, jsonMapper);
+    this.jsonMapper = jsonMapper;
+    this.serviceClient = serviceClient;
     this.testingConfig = testingConfig;
+    log.info("Initialized FaultyOverlordClient with config[%s]", 
testingConfig);
+  }
+
+  @Override
+  public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
+      @Nullable String state,
+      @Nullable String dataSource,
+      @Nullable Integer maxCompletedTasks
+  )
+  {
+    addDelayIfConfigured();
+    return super.taskStatuses(state, dataSource, maxCompletedTasks);
+  }
+
+  @Override
+  public ListenableFuture<Map<String, TaskStatus>> taskStatuses(Set<String> 
taskIds)
+  {
+    addDelayIfConfigured();
+    return super.taskStatuses(taskIds);
+  }
+
+  @Override
+  public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
+  {
+    addDelayIfConfigured();
+    return super.taskStatus(taskId);
+  }
+
+  @Override
+  public OverlordClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
+  {
+    return new FaultyOverlordClient(testingConfig, jsonMapper, serviceClient);
+  }
+
+  private void addDelayIfConfigured()
+  {
+    final Duration delay = testingConfig.getTaskStatusDelay();
+    if (delay == null) {
+      return;
+    }
+
+    try {
+      log.info("Sleeping for [%s] before calling Overlord", delay);
+      Thread.sleep(delay.getMillis());
+    }
+    catch (InterruptedException e) {
+      log.info("Interrupted while sleeping before task action.");
+    }
   }
 }
diff --git 
a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
 
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
index 569ad33e3ce..6cb9d77721c 100644
--- 
a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
+++ 
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.guice;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Injector;
 import org.apache.commons.io.FileUtils;
+import org.apache.druid.cli.CliIndexer;
 import org.apache.druid.cli.CliOverlord;
 import org.apache.druid.cli.CliPeon;
 import org.apache.druid.client.coordinator.CoordinatorClient;
@@ -128,9 +129,6 @@ public class ClusterTestingModuleTest
       CoordinatorClient coordinatorClient = 
peonInjector.getInstance(CoordinatorClient.class);
       Assert.assertTrue(coordinatorClient instanceof FaultyCoordinatorClient);
 
-      OverlordClient overlordClient = 
peonInjector.getInstance(OverlordClient.class);
-      Assert.assertTrue(overlordClient instanceof FaultyOverlordClient);
-
       TaskActionClientFactory taskActionClientFactory = 
peonInjector.getInstance(TaskActionClientFactory.class);
       Assert.assertTrue(taskActionClientFactory instanceof 
FaultyRemoteTaskActionClientFactory);
     }
@@ -252,6 +250,26 @@ public class ClusterTestingModuleTest
     }
   }
 
+  @Test
+  public void test_indexerService_hasFaultyOverlordClient_ifTestingIsEnabled()
+  {
+    try {
+      final CliIndexer indexer = new CliIndexer();
+      System.setProperty("druid.unsafe.cluster.testing", "true");
+
+      final Injector baseInjector = new 
StartupInjectorBuilder().forServer().build();
+      baseInjector.injectMembers(indexer);
+
+      final Injector indexerInjector = 
indexer.makeInjector(Set.of(NodeRole.INDEXER));
+
+      OverlordClient overlordClient = 
indexerInjector.getInstance(OverlordClient.class);
+      Assert.assertTrue(overlordClient instanceof FaultyOverlordClient);
+    }
+    finally {
+      System.clearProperty("druid.unsafe.cluster.testing");
+    }
+  }
+
   private static void verifyTestingConfig(ClusterTestingTaskConfig taskConfig)
   {
     Assert.assertNotNull(taskConfig);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 72bd9cff174..7677b4a0371 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -129,7 +129,7 @@ import java.util.stream.Collectors;
  * workers to support deprecated RemoteTaskRunner. So a method 
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
  * which should be removed in the release that removes RemoteTaskRunner legacy 
ZK updation WorkerTaskMonitor class.
  */
-public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
+public class HttpRemoteTaskRunner implements WorkerTaskRunner, 
TaskLogStreamer, WorkerHolder.Listener
 {
   private static final EmittingLogger log = new 
EmittingLogger(HttpRemoteTaskRunner.class);
 
@@ -630,7 +630,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
             httpClient,
             config,
             workersSyncExec,
-            this::taskAddedOrUpdated,
+            this,
             worker,
             expectedAnnouncements
         );
@@ -1507,7 +1507,7 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
     return Optional.fromNullable(provisioningService.getStats());
   }
 
-  @VisibleForTesting
+  @Override
   public void taskAddedOrUpdated(final TaskAnnouncement announcement, final 
WorkerHolder workerHolder)
   {
     final String taskId = announcement.getTaskId();
@@ -1706,6 +1706,14 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
     }
   }
 
+  @Override
+  public void stateChanged(boolean enabled, WorkerHolder workerHolder)
+  {
+    synchronized (statusLock) {
+      statusLock.notifyAll();
+    }
+  }
+
   @Override
   public Map<String, Long> getTotalTaskSlotCount()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index 9d177c65410..4444916e69f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -442,6 +442,7 @@ public class WorkerHolder
         if (isWorkerDisabled != disabled.get()) {
           disabled.set(isWorkerDisabled);
           log.info("Worker[%s] disabled set to [%s].", worker.getHost(), 
isWorkerDisabled);
+          listener.stateChanged(!isWorkerDisabled, WorkerHolder.this);
         }
       }
     };
@@ -450,5 +451,7 @@ public class WorkerHolder
   public interface Listener
   {
     void taskAddedOrUpdated(TaskAnnouncement announcement, WorkerHolder 
workerHolder);
+
+    void stateChanged(boolean enabled, WorkerHolder workerHolder);
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
index 3d0336029c1..d9ea67f0ce9 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
@@ -59,7 +59,20 @@ public class WorkerHolderTest
         EasyMock.createNiceMock(HttpClient.class),
         new HttpRemoteTaskRunnerConfig(),
         EasyMock.createNiceMock(ScheduledExecutorService.class),
-        (taskAnnouncement, holder) -> updates.add(taskAnnouncement),
+        new WorkerHolder.Listener()
+        {
+          @Override
+          public void taskAddedOrUpdated(TaskAnnouncement announcement, 
WorkerHolder workerHolder)
+          {
+            updates.add(announcement);
+          }
+
+          @Override
+          public void stateChanged(boolean enabled, WorkerHolder workerHolder)
+          {
+            // Do nothing
+          }
+        },
         new Worker("http", "localhost", "127.0.0.1", 5, "v0", 
WorkerConfig.DEFAULT_CATEGORY),
         ImmutableList.of(
             TaskAnnouncement.create(
diff --git a/integration-tests-ex/cases/pom.xml 
b/integration-tests-ex/cases/pom.xml
index 7ca37e758fc..703287d4857 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -247,11 +247,6 @@
              <scope>provided</scope>
          </dependency>
 
-        <dependency>
-            <groupId>org.apache.druid</groupId>
-            <artifactId>druid-sql</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.druid</groupId>
             <artifactId>druid-multi-stage-query</artifactId>
@@ -399,15 +394,6 @@
                 <it.category>AzureDeepStorage</it.category>
             </properties>
         </profile>
-        <profile>
-            <id>IT-Query</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <properties>
-                <it.category>Query</it.category>
-            </properties>
-        </profile>
         <profile>
             <id>IT-MultiStageQuery</id>
             <activation>
@@ -417,15 +403,6 @@
                 <it.category>MultiStageQuery</it.category>
             </properties>
         </profile>
-        <profile>
-            <id>IT-MultiStageQueryWithMM</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <properties>
-                <it.category>MultiStageQueryWithMM</it.category>
-            </properties>
-        </profile>
         <profile>
             <id>IT-S3DeepStorage</id>
             <activation>
@@ -444,15 +421,6 @@
                 <it.category>GcsDeepStorage</it.category>
             </properties>
         </profile>
-        <profile>
-            <id>IT-BackwardCompatibilityMain</id>
-            <activation>
-                <activeByDefault>false</activeByDefault>
-            </activation>
-            <properties>
-                <it.category>BackwardCompatibilityMain</it.category>
-            </properties>
-        </profile>
         <profile>
             <id>docker-tests</id>
             <activation>
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQueryWithMM.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQueryWithMM.java
deleted file mode 100644
index 97725c72255..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQueryWithMM.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.categories;
-
-public class MultiStageQueryWithMM
-{
-}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/Query.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/Query.java
deleted file mode 100644
index 252d1325f03..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/Query.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.categories;
-
-public class Query
-{
-}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
deleted file mode 100644
index 8fb325b1c65..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
-import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.sql.http.SqlQuery;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.MsqTestQueryHelper;
-import org.apache.druid.testsEx.categories.MultiStageQuery;
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-@RunWith(DruidTestRunner.class)
-@Category(MultiStageQuery.class)
-public class ITKeyStatisticsSketchMergeMode
-{
-  @Inject
-  private MsqTestQueryHelper msqHelper;
-
-  @Inject
-  private DataLoaderHelper dataLoaderHelper;
-
-  @Inject
-  private CoordinatorResourceTestClient coordinatorClient;
-
-  private static final String QUERY_FILE = 
"/multi-stage-query/wikipedia_msq_select_query1.json";
-
-  @Test
-  public void testMsqIngestionParallelMerging() throws Exception
-  {
-    String datasource = "dst";
-
-    // Clear up the datasource from the previous runs
-    coordinatorClient.unloadSegmentsForDataSource(datasource);
-
-    String queryLocal =
-        StringUtils.format(
-            "INSERT INTO %s\n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n"
-            + "PARTITIONED BY DAY\n"
-            + "CLUSTERED BY \"__time\"",
-            datasource
-        );
-
-    ImmutableMap<String, Object> context = ImmutableMap.of(
-        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
-        ClusterStatisticsMergeMode.PARALLEL
-    );
-
-    // Submit the task and wait for the datasource to get loaded
-    SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false, 
context, null);
-    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(sqlQuery);
-
-    if (sqlTaskStatus.getState().isFailure()) {
-      Assert.fail(StringUtils.format(
-          "Unable to start the task successfully.\nPossible exception: %s",
-          sqlTaskStatus.getError()
-      ));
-    }
-
-    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
-    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
-    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
-  }
-
-  @Test
-  public void testMsqIngestionSequentialMerging() throws Exception
-  {
-    String datasource = "dst";
-
-    // Clear up the datasource from the previous runs
-    coordinatorClient.unloadSegmentsForDataSource(datasource);
-
-    String queryLocal =
-        StringUtils.format(
-            "INSERT INTO %s\n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n"
-            + "PARTITIONED BY DAY\n"
-            + "CLUSTERED BY \"__time\"",
-            datasource
-        );
-
-    ImmutableMap<String, Object> context = ImmutableMap.of(
-        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
-        ClusterStatisticsMergeMode.SEQUENTIAL
-    );
-
-    // Submit the task and wait for the datasource to get loaded
-    SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false, 
context, null);
-    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(sqlQuery);
-
-    if (sqlTaskStatus.getState().isFailure()) {
-      Assert.fail(StringUtils.format(
-          "Unable to start the task successfully.\nPossible exception: %s",
-          sqlTaskStatus.getError()
-      ));
-    }
-
-    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
-    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
-    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
-  }
-
-
-  @Test
-  public void testMsqIngestionSequentialMergingWithEmptyStatistics() throws 
Exception
-  {
-    String datasource = "dst";
-
-    // Clear up the datasource from the previous runs
-    coordinatorClient.unloadSegmentsForDataSource(datasource);
-
-    String queryLocal =
-        StringUtils.format(
-            "Replace INTO %s overwrite ALL \n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n"
-            + "where delta=111 "
-            // we add this filter since delta=111 is only present in 
wikipedia_index_data1.json. This means partitions from worker 2 will be empty.
-            + "PARTITIONED BY DAY\n"
-            + "CLUSTERED BY \"__time\"",
-            datasource
-        );
-
-    ImmutableMap<String, Object> context = ImmutableMap.of(
-        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
-        ClusterStatisticsMergeMode.SEQUENTIAL,
-        MultiStageQueryContext.CTX_MAX_NUM_TASKS,
-        3
-    );
-
-    // Submit the task and wait for the datasource to get loaded
-    SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false, 
context, null);
-    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(sqlQuery);
-
-    if (sqlTaskStatus.getState().isFailure()) {
-      Assert.fail(StringUtils.format(
-          "Unable to start the task successfully.\nPossible exception: %s",
-          sqlTaskStatus.getError()
-      ));
-    }
-
-    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
-    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
-    
msqHelper.testQueriesFromFile("/multi-stage-query/wikipedia_msq_select_query_sequential_test.json",
 datasource);
-  }
-}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
deleted file mode 100644
index ce6462e225a..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-@RunWith(DruidTestRunner.class)
-@Category(MultiStageQuery.class)
-public class ITMultiStageQuery extends MultiStageQuery
-{
-}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryWorkerFaultTolerance.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryWorkerFaultTolerance.java
deleted file mode 100644
index 99282ecae4c..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryWorkerFaultTolerance.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.MsqTestQueryHelper;
-import org.apache.druid.testsEx.categories.MultiStageQueryWithMM;
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-/**
- * As we need to kill the PID of the launched task, these tests should be run 
with middle manager only.
- */
-@RunWith(DruidTestRunner.class)
-@Category(MultiStageQueryWithMM.class)
-public class ITMultiStageQueryWorkerFaultTolerance
-{
-  private static final Logger LOG = new 
Logger(ITMultiStageQueryWorkerFaultTolerance.class);
-  @Inject
-  private MsqTestQueryHelper msqHelper;
-
-  @Inject
-  private DataLoaderHelper dataLoaderHelper;
-
-  @Inject
-  private CoordinatorResourceTestClient coordinatorClient;
-
-  @Inject
-  private DruidClusterAdminClient druidClusterAdminClient;
-
-  private static final String QUERY_FILE = 
"/multi-stage-query/wikipedia_msq_select_query_ha.json";
-
-  @Test
-  public void testMsqIngestionAndQuerying() throws Exception
-  {
-    String datasource = "dst";
-
-    // Clear up the datasource from the previous runs
-    coordinatorClient.unloadSegmentsForDataSource(datasource);
-
-    String queryLocal =
-        StringUtils.format(
-            "INSERT INTO %s\n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n"
-            + "PARTITIONED BY DAY\n"
-            + "CLUSTERED BY \"__time\"",
-            datasource
-        );
-
-    // Submit the task and wait for the datasource to get loaded
-    SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(
-        queryLocal,
-        ImmutableMap.of(
-            MultiStageQueryContext.CTX_FAULT_TOLERANCE,
-            "true",
-            MultiStageQueryContext.CTX_MAX_NUM_TASKS,
-            3
-        )
-    );
-
-    if (sqlTaskStatus.getState().isFailure()) {
-      Assert.fail(StringUtils.format(
-          "Unable to start the task successfully.\nPossible exception: %s",
-          sqlTaskStatus.getError()
-      ));
-    }
-
-
-    String taskIdToKill = sqlTaskStatus.getTaskId() + "-worker1_0";
-    killTaskAbruptly(taskIdToKill);
-
-    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
-    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
-    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
-  }
-
-  private void killTaskAbruptly(String taskIdToKill)
-  {
-    String command = "jps -mlv | grep -i peon | grep -i " + taskIdToKill + " 
|awk  '{print  $1}'";
-
-    ITRetryUtil.retryUntil(() -> {
-
-      Pair<String, String> stdOut = 
druidClusterAdminClient.runCommandInMiddleManagerContainer("/bin/bash", "-c",
-                                                                               
                command
-      );
-      LOG.info(StringUtils.format(
-          "command %s \nstdout: %s\nstderr: %s",
-          command,
-          stdOut.lhs,
-          stdOut.rhs
-      ));
-      if (stdOut.rhs != null && stdOut.rhs.length() != 0) {
-        throw new ISE("Bad command");
-      }
-      String pidToKill = stdOut.lhs.trim();
-      if (pidToKill.length() != 0) {
-        LOG.info("Killing pid %s", pidToKill);
-        final Pair<String, String> killResult = 
druidClusterAdminClient.runCommandInMiddleManagerContainer(
-            "/bin/bash",
-            "-c",
-            "kill -9 " + pidToKill
-        );
-        LOG.info(StringUtils.format("Kill command stdout: %s, stderr: %s", 
killResult.lhs, killResult.rhs));
-        return true;
-      } else {
-        return false;
-      }
-    }, true, 2000, 100, StringUtils.format("Figuring out PID for task[%s] to 
kill abruptly", taskIdToKill));
-  }
-}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
deleted file mode 100644
index d77d5dfe2c9..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import com.google.api.client.util.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import org.apache.druid.indexer.report.TaskContextReport;
-import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.storage.local.LocalFileExportStorageProvider;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.MsqTestQueryHelper;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class MultiStageQuery
-{
-  @Inject
-  private MsqTestQueryHelper msqHelper;
-
-  @Inject
-  private DataLoaderHelper dataLoaderHelper;
-
-  @Inject
-  private CoordinatorResourceTestClient coordinatorClient;
-
-  private static final String QUERY_FILE = 
"/multi-stage-query/wikipedia_msq_select_query1.json";
-
-  @Test
-  public void testMsqIngestionAndQuerying() throws Exception
-  {
-    String datasource = "dst";
-
-    // Clear up the datasource from the previous runs
-    coordinatorClient.unloadSegmentsForDataSource(datasource);
-
-    String queryLocal =
-        StringUtils.format(
-            "INSERT INTO %s\n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n"
-            + "PARTITIONED BY DAY\n",
-            datasource
-        );
-
-    // Submit the task and wait for the datasource to get loaded
-    SqlTaskStatus sqlTaskStatus = 
msqHelper.submitMsqTaskSuccesfully(queryLocal);
-
-    if (sqlTaskStatus.getState().isFailure()) {
-      Assert.fail(StringUtils.format(
-          "Unable to start the task successfully.\nPossible exception: %s",
-          sqlTaskStatus.getError()
-      ));
-    }
-
-    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
-    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
-    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
-  }
-
-  @Test
-  @Ignore("localfiles() is disabled")
-  public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
-  {
-    String datasource = "dst";
-
-    // Clear up the datasource from the previous runs
-    coordinatorClient.unloadSegmentsForDataSource(datasource);
-
-    String queryLocal =
-        StringUtils.format(
-            "INSERT INTO %s\n"
-            + "SELECT\n"
-            + "  TIME_PARSE(\"timestamp\") AS __time,\n"
-            + "  isRobot,\n"
-            + "  diffUrl,\n"
-            + "  added,\n"
-            + "  countryIsoCode,\n"
-            + "  regionName,\n"
-            + "  channel,\n"
-            + "  flags,\n"
-            + "  delta,\n"
-            + "  isUnpatrolled,\n"
-            + "  isNew,\n"
-            + "  deltaBucket,\n"
-            + "  isMinor,\n"
-            + "  isAnonymous,\n"
-            + "  deleted,\n"
-            + "  cityName,\n"
-            + "  metroCode,\n"
-            + "  namespace,\n"
-            + "  comment,\n"
-            + "  page,\n"
-            + "  commentLength,\n"
-            + "  countryName,\n"
-            + "  user,\n"
-            + "  regionIsoCode\n"
-            + "FROM TABLE(\n"
-            + "  LOCALFILES(\n"
-            + "    files => 
ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
-            + "    format => 'json'\n"
-            + "  ))\n"
-            + "  (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR, 
added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
-            + "   channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled 
VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
-            + "   isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT, 
cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
-            + "   comment VARCHAR, page VARCHAR, commentLength BIGINT, 
countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
-            + "PARTITIONED BY DAY\n",
-            datasource
-        );
-
-    // Submit the task and wait for the datasource to get loaded
-    SqlTaskStatus sqlTaskStatus = 
msqHelper.submitMsqTaskSuccesfully(queryLocal);
-
-    if (sqlTaskStatus.getState().isFailure()) {
-      Assert.fail(StringUtils.format(
-          "Unable to start the task successfully.\nPossible exception: %s",
-          sqlTaskStatus.getError()
-      ));
-    }
-
-    msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
-    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
-    msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
-  }
-
-  @Test
-  public void testExport() throws Exception
-  {
-    String exportQuery =
-        StringUtils.format(
-            "INSERT INTO extern(%s(exportPath => '%s'))\n"
-            + "AS CSV\n"
-            + "SELECT page, added, delta\n"
-            + "FROM TABLE(\n"
-            + "  EXTERN(\n"
-            + "    
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
-            + "    '{\"type\":\"json\"}',\n"
-            + "    
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
 [...]
-            + "  )\n"
-            + ")\n",
-            LocalFileExportStorageProvider.TYPE_NAME, "/shared/export/"
-        );
-
-    SqlTaskStatus exportTask = msqHelper.submitMsqTaskSuccesfully(exportQuery);
-
-    msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
-
-    if (exportTask.getState().isFailure()) {
-      Assert.fail(StringUtils.format(
-          "Unable to start the task successfully.\nPossible exception: %s",
-          exportTask.getError()
-      ));
-    }
-
-    String resultQuery = StringUtils.format(
-        "SELECT page, delta, added\n"
-        + "  FROM TABLE(\n"
-        + "    EXTERN(\n"
-        + "      
'{\"type\":\"local\",\"baseDir\":\"/shared/export/\",\"filter\":\"*.csv\"}',\n"
-        + "      '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
-        + "    )\n"
-        + "  ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
-        + "   WHERE delta != 0\n"
-        + "   ORDER BY page");
-
-    SqlTaskStatus resultTaskStatus = 
msqHelper.submitMsqTaskSuccesfully(resultQuery);
-
-    msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
-
-    TaskReport.ReportMap statusReport = 
msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
-
-    MSQTaskReport taskReport = (MSQTaskReport) 
statusReport.get(MSQTaskReport.REPORT_KEY);
-    if (taskReport == null) {
-      throw new ISE("Unable to fetch the status report for the task [%]", 
resultTaskStatus.getTaskId());
-    }
-    TaskContextReport taskContextReport = (TaskContextReport) 
statusReport.get(TaskContextReport.REPORT_KEY);
-    Assert.assertFalse(taskContextReport.getPayload().isEmpty());
-
-    MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
-        taskReport.getPayload(),
-        "payload"
-    );
-    MSQResultsReport resultsReport = Preconditions.checkNotNull(
-        taskReportPayload.getResults(),
-        "Results report for the task id is empty"
-    );
-
-    List<List<Object>> actualResults = new ArrayList<>();
-
-    for (final Object[] row : resultsReport.getResults()) {
-      actualResults.add(Arrays.asList(row));
-    }
-
-    ImmutableList<ImmutableList<Object>> expectedResults = ImmutableList.of(
-        ImmutableList.of("Cherno Alpha", 111, 123),
-        ImmutableList.of("Gypsy Danger", -143, 57),
-        ImmutableList.of("Striker Eureka", 330, 459)
-    );
-
-    Assert.assertEquals(
-        expectedResults,
-        actualResults
-    );
-  }
-}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
deleted file mode 100644
index 03022da32fa..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.query;
-
-import org.apache.druid.testsEx.categories.Query;
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-@RunWith(DruidTestRunner.class)
-@Category(Query.class)
-public class ITUnionQueryTest extends UnionQueryTest
-{
-}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
deleted file mode 100644
index 34cd2c30eae..00000000000
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.query;
-
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.KafkaEventWriter;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.utils.KafkaAdminClient;
-import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
-import org.joda.time.Interval;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.function.Function;
-
-public class UnionQueryTest extends AbstractIndexerTest
-{
-  private static final Logger LOG = new Logger(UnionQueryTest.class);
-  private static final String UNION_SUPERVISOR_TEMPLATE = 
"/query/union_kafka_supervisor_template.json";
-  private static final String UNION_DATA_FILE = "/query/union_data.json";
-  private static final String UNION_QUERIES_RESOURCE = 
"/query/union_queries.json";
-  private static final String UNION_DATASOURCE = "wikipedia_index_test";
-  private String fullDatasourceName;
-
-  @Test
-  public void testUnionQuery() throws Exception
-  {
-    fullDatasourceName = UNION_DATASOURCE + 
config.getExtraDatasourceNameSuffix();
-    final String baseName = fullDatasourceName + UUID.randomUUID();
-    KafkaAdminClient streamAdminClient = new KafkaAdminClient(config);
-    List<String> supervisors = new ArrayList<>();
-
-    final int numDatasources = 3;
-    for (int i = 0; i < numDatasources; i++) {
-      String datasource = baseName + "-" + i;
-      streamAdminClient.createStream(datasource, 1, Collections.emptyMap());
-      ITRetryUtil.retryUntil(
-          () -> streamAdminClient.isStreamActive(datasource),
-          true,
-          10000,
-          30,
-          "Wait for stream active"
-      );
-      String supervisorSpec = generateStreamIngestionPropsTransform(
-          datasource,
-          datasource,
-          config
-      ).apply(getResourceAsString(UNION_SUPERVISOR_TEMPLATE));
-      LOG.info("supervisorSpec: [%s]\n", supervisorSpec);
-      // Start supervisor
-      String specResponse = indexer.submitSupervisor(supervisorSpec);
-      LOG.info("Submitted supervisor [%s]", specResponse);
-      supervisors.add(specResponse);
-
-      int ctr = 0;
-      try (
-          StreamEventWriter streamEventWriter = new KafkaEventWriter(config, 
false);
-          BufferedReader reader = new BufferedReader(
-              new InputStreamReader(getResourceAsStream(UNION_DATA_FILE), 
StandardCharsets.UTF_8)
-          )
-      ) {
-        String line;
-        while ((line = reader.readLine()) != null) {
-          streamEventWriter.write(datasource, StringUtils.toUtf8(line));
-          ctr++;
-        }
-      }
-      final int numWritten = ctr;
-
-      LOG.info("Waiting for stream indexing tasks to consume events");
-
-      ITRetryUtil.retryUntilTrue(
-          () ->
-              numWritten == this.queryHelper.countRows(
-                  datasource,
-                  Intervals.ETERNITY,
-                  name -> new LongSumAggregatorFactory(name, "count")
-              ),
-          StringUtils.format(
-              "dataSource[%s] consumed [%,d] events, expected [%,d]",
-              datasource,
-              this.queryHelper.countRows(
-                  datasource,
-                  Intervals.ETERNITY,
-                  name -> new LongSumAggregatorFactory(name, "count")
-              ),
-              numWritten
-          )
-      );
-    }
-
-    String queryResponseTemplate = StringUtils.replace(
-        getResourceAsString(UNION_QUERIES_RESOURCE),
-        "%%DATASOURCE%%",
-        baseName
-    );
-
-    queryHelper.testQueriesFromString(queryResponseTemplate);
-
-
-    for (int i = 0; i < numDatasources; i++) {
-      indexer.terminateSupervisor(supervisors.get(i));
-      streamAdminClient.deleteStream(baseName + "-" + i);
-    }
-
-    for (int i = 0; i < numDatasources; i++) {
-      final int datasourceNumber = i;
-      ITRetryUtil.retryUntil(
-          () -> coordinator.areSegmentsLoaded(baseName + "-" + 
datasourceNumber),
-          true,
-          10000,
-          10,
-          "Kafka segments loaded"
-      );
-    }
-
-    queryHelper.testQueriesFromString(queryResponseTemplate);
-
-    for (int i = 0; i < numDatasources; i++) {
-      final String datasource = baseName + "-" + i;
-      List<String> intervals = coordinator.getSegmentIntervals(datasource);
-
-      Collections.sort(intervals);
-      String first = intervals.get(0).split("/")[0];
-      String last = intervals.get(intervals.size() - 1).split("/")[1];
-      Interval interval = Intervals.of(first + "/" + last);
-      coordinator.unloadSegmentsForDataSource(baseName + "-" + i);
-      ITRetryUtil.retryUntilFalse(
-          () -> coordinator.areSegmentsLoaded(datasource),
-          "Segment Unloading"
-      );
-      coordinator.deleteSegmentsDataSource(baseName + "-" + i, interval);
-    }
-  }
-
-
-  /**
-   * sad version of
-   * {@link 
org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest#generateStreamIngestionPropsTransform}
-   */
-  private Function<String, String> generateStreamIngestionPropsTransform(
-      String streamName,
-      String fullDatasourceName,
-      IntegrationTestingConfig config
-  )
-  {
-    final Map<String, Object> consumerConfigs = 
KafkaConsumerConfigs.getConsumerProperties();
-    final Properties consumerProperties = new Properties();
-    consumerProperties.putAll(consumerConfigs);
-    consumerProperties.setProperty("bootstrap.servers", 
config.getKafkaInternalHost());
-    KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
-    return spec -> {
-      try {
-        spec = StringUtils.replace(
-            spec,
-            "%%DATASOURCE%%",
-            fullDatasourceName
-        );
-        spec = StringUtils.replace(
-            spec,
-            "%%TOPIC_VALUE%%",
-            streamName
-        );
-        return StringUtils.replace(
-            spec,
-            "%%STREAM_PROPERTIES_VALUE%%",
-            jsonMapper.writeValueAsString(consumerProperties)
-        );
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    };
-  }
-}
diff --git 
a/integration-tests-ex/cases/src/test/resources/query/union_data.json 
b/integration-tests-ex/cases/src/test/resources/query/union_data.json
deleted file mode 100644
index b186657dbcf..00000000000
--- a/integration-tests-ex/cases/src/test/resources/query/union_data.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : 
"en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": 
"false", "anonymous": "false", "namespace":"article", "continent":"North 
America", "country":"United States", "region":"Bay Area", "city":"San 
Francisco", "added": 57, "deleted": 200, "delta": -143}
-{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : 
"en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": 
"true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", 
"country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, 
"deleted": 129, "delta": 330}
-{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : 
"ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", 
"robot": "true", "anonymous": "false", "namespace":"article", 
"continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", 
"added": 123, "deleted": 12, "delta": 111}
-{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : 
"zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", 
"robot": "true", "anonymous": "false", "namespace":"wikipedia", 
"continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", 
"added": 905, "deleted": 5, "delta": 900}
-{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : 
"ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", 
"robot": "true", "anonymous": "false", "namespace":"wikipedia", 
"continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", 
"added": 1, "deleted": 10, "delta": -9}
-{"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" : 
"en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": 
"false", "anonymous": "false", "namespace":"article", "continent":"North 
America", "country":"United States", "region":"Bay Area", "city":"San 
Francisco", "added": 57, "deleted": 200, "delta": -143}
-{"timestamp": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" : 
"en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": 
"true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", 
"country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, 
"deleted": 129, "delta": 330}
-{"timestamp": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" : 
"ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", 
"robot": "true", "anonymous": "false", "namespace":"article", 
"continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", 
"added": 123, "deleted": 12, "delta": 111}
-{"timestamp": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" : 
"zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", 
"robot": "true", "anonymous": "false", "namespace":"wikipedia", 
"continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", 
"added": 905, "deleted": 5, "delta": 900}
-{"timestamp": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" : 
"ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false", 
"robot": "true", "anonymous": "false", "namespace":"wikipedia", 
"continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", 
"added": 1, "deleted": 10, "delta": -9}
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/query/union_kafka_supervisor_template.json
 
b/integration-tests-ex/cases/src/test/resources/query/union_kafka_supervisor_template.json
deleted file mode 100644
index a37340fe858..00000000000
--- 
a/integration-tests-ex/cases/src/test/resources/query/union_kafka_supervisor_template.json
+++ /dev/null
@@ -1,69 +0,0 @@
-{
-  "type": "kafka",
-  "dataSchema": {
-    "dataSource": "%%DATASOURCE%%",
-    "timestampSpec": {
-      "column": "timestamp",
-      "format": "auto"
-    },
-    "dimensionsSpec": {
-      "dimensions": [
-        "page",
-        "language",
-        "user",
-        "unpatrolled",
-        "newPage",
-        "robot",
-        "anonymous",
-        "namespace",
-        "continent",
-        "country",
-        "region",
-        "city"
-      ],
-      "dimensionExclusions": [],
-      "spatialDimensions": []
-    },
-    "metricsSpec": [
-      {
-        "type": "count",
-        "name": "count"
-      },
-      {
-        "type": "doubleSum",
-        "name": "added",
-        "fieldName": "added"
-      },
-      {
-        "type": "doubleSum",
-        "name": "deleted",
-        "fieldName": "deleted"
-      },
-      {
-        "type": "doubleSum",
-        "name": "delta",
-        "fieldName": "delta"
-      }
-    ],
-    "granularitySpec": {
-      "type": "uniform",
-      "segmentGranularity": "DAY",
-      "queryGranularity": "second"
-    }
-  },
-  "tuningConfig": {
-    "type": "kafka",
-    "intermediatePersistPeriod": "PT1H",
-    "maxRowsPerSegment": 5000000,
-    "maxRowsInMemory": 500000
-  },
-  "ioConfig": {
-    "topic": "%%TOPIC_VALUE%%",
-    "consumerProperties": %%STREAM_PROPERTIES_VALUE%%,
-    "taskCount": 2,
-    "replicas": 1,
-    "taskDuration": "PT120S",
-    "useEarliestOffset": true,
-    "inputFormat" : {"type": "json"}
-  }
-}
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/query/union_queries.json 
b/integration-tests-ex/cases/src/test/resources/query/union_queries.json
deleted file mode 100644
index 74dd6db299d..00000000000
--- a/integration-tests-ex/cases/src/test/resources/query/union_queries.json
+++ /dev/null
@@ -1,566 +0,0 @@
-[
-  {
-    "description": "timeseries, filtered, all aggs, all",
-    "query": {
-      "queryType": "timeseries",
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      },
-      "intervals": ["2013-08-31/2013-09-01"],
-      "granularity": "all",
-      "filter": {
-        "type": "selector",
-        "dimension": "language",
-        "value": "en"
-      },
-      "aggregations": [
-        {
-          "type": "count",
-          "name": "rows"
-        },
-        {
-          "type": "longSum",
-          "fieldName": "count",
-          "name": "count"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "added",
-          "name": "added"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "deleted",
-          "name": "deleted"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "delta",
-          "name": "delta"
-        }
-      ],
-      "context": {
-        "useCache": "true",
-        "populateCache": "true",
-        "timeout": 60000
-      }
-    },
-    "expectedResults": [
-      {
-        "timestamp": "2013-08-31T01:02:33.000Z",
-        "result": {
-          "added": 1548.0,
-          "count": 6,
-          "delta": 561.0,
-          "deleted": 987.0,
-          "rows": 6
-        }
-      }
-    ]
-  },
-  {
-    "description": "topN, all aggs, page dim, uniques metric",
-    "query": {
-      "queryType": "topN",
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      },
-      "intervals": ["2013-08-31/2013-09-01"],
-      "granularity": "all",
-      "aggregations": [
-        {
-          "type": "count",
-          "name": "rows"
-        },
-        {
-          "type": "longSum",
-          "fieldName": "count",
-          "name": "count"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "added",
-          "name": "added"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "deleted",
-          "name": "deleted"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "delta",
-          "name": "delta"
-        }
-      ],
-      "dimension": "page",
-      "metric": "added",
-      "threshold": 3,
-      "context": {
-        "useCache": "true",
-        "populateCache": "true",
-        "timeout": 60000
-      }
-    },
-    "expectedResults": [
-      {
-        "timestamp": "2013-08-31T01:02:33.000Z",
-        "result": [
-          {
-            "added": 2715.0,
-            "count": 3,
-            "page": "Crimson Typhoon",
-            "delta": 2700.0,
-            "deleted": 15.0,
-            "rows": 3
-          },
-          {
-            "added": 1377.0,
-            "count": 3,
-            "page": "Striker Eureka",
-            "delta": 990.0,
-            "deleted": 387.0,
-            "rows": 3
-          },
-          {
-            "added": 369.0,
-            "count": 3,
-            "page": "Cherno Alpha",
-            "delta": 333.0,
-            "deleted": 36.0,
-            "rows": 3
-          }
-        ]
-      }
-    ]
-  },
-  {
-    "description": "topN, all aggs, page dim, count metric, postAggs",
-    "query": {
-      "queryType": "topN",
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      },
-      "intervals": ["2013-08-31/2013-09-01"],
-      "granularity": "all",
-      "aggregations": [
-        {
-          "type": "count",
-          "name": "rows"
-        },
-        {
-          "type": "longSum",
-          "fieldName": "count",
-          "name": "count"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "added",
-          "name": "added"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "deleted",
-          "name": "deleted"
-        },
-        {
-          "type": "doubleSum",
-          "fieldName": "delta",
-          "name": "delta"
-        }
-      ],
-      "postAggregations": [
-        {
-          "type": "arithmetic",
-          "name": "sumOfAddedDeletedConst",
-          "fn": "+",
-          "fields": [
-            {
-              "type": "fieldAccess",
-              "name": "added",
-              "fieldName": "added"
-            },
-            {
-              "type": "arithmetic",
-              "name": "",
-              "fn": "+",
-              "fields": [
-                {
-                  "type": "fieldAccess",
-                  "name": "deleted",
-                  "fieldName": "deleted"
-                },
-                {
-                  "type": "constant",
-                  "name": "constant",
-                  "value": 1000
-                }
-              ]
-            }
-          ]
-        }
-      ],
-      "dimension": "page",
-      "metric": "added",
-      "threshold": 3,
-      "context": {
-        "useCache": "true",
-        "populateCache": "true",
-        "timeout": 60000
-      }
-    },
-    "expectedResults": [
-      {
-        "timestamp": "2013-08-31T01:02:33.000Z",
-        "result": [
-          {
-            "added": 2715.0,
-            "count": 3,
-            "page": "Crimson Typhoon",
-            "delta": 2700.0,
-            "deleted": 15.0,
-            "sumOfAddedDeletedConst": 3730.0,
-            "rows": 3
-          },
-          {
-            "added": 1377.0,
-            "count": 3,
-            "page": "Striker Eureka",
-            "delta": 990.0,
-            "deleted": 387.0,
-            "sumOfAddedDeletedConst": 2764.0,
-            "rows": 3
-          },
-          {
-            "added": 369.0,
-            "count": 3,
-            "page": "Cherno Alpha",
-            "delta": 333.0,
-            "deleted": 36.0,
-            "sumOfAddedDeletedConst": 1405.0,
-            "rows": 3
-          }
-        ]
-      }
-    ]
-  },
-  {
-    "description": "topN, lexicographic, two aggs, language dim, postAggs",
-    "query": {
-      "queryType": "topN",
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      },
-      "intervals": ["2013-08-31/2013-09-01"],
-      "granularity": "all",
-      "aggregations": [
-        {
-          "type": "count",
-          "name": "rows"
-        },
-        {
-          "type": "longSum",
-          "fieldName": "count",
-          "name": "count"
-        }
-      ],
-      "postAggregations": [
-        {
-          "type": "arithmetic",
-          "name": "sumOfRowsAndCount",
-          "fn": "+",
-          "fields": [
-            {
-              "type": "fieldAccess",
-              "name": "rows",
-              "fieldName": "rows"
-            },
-            {
-              "type": "fieldAccess",
-              "name": "count",
-              "fieldName": "count"
-            }
-          ]
-        }
-      ],
-      "dimension": "language",
-      "metric": {
-        "type": "lexicographic",
-        "previousStop": "a"
-      },
-      "threshold": 3,
-      "context": {
-        "useCache": "true",
-        "populateCache": "true",
-        "timeout": 60000
-      }
-    },
-    "expectedResults": [
-      {
-        "timestamp": "2013-08-31T01:02:33.000Z",
-        "result": [
-          {
-            "sumOfRowsAndCount": 12.0,
-            "count": 6,
-            "language": "en",
-            "rows": 6
-          },
-          {
-            "sumOfRowsAndCount": 6.0,
-            "count": 3,
-            "language": "ja",
-            "rows": 3
-          },
-          {
-            "sumOfRowsAndCount": 6.0,
-            "count": 3,
-            "language": "ru",
-            "rows": 3
-          }
-        ]
-      }
-    ]
-  },
-  {
-    "description": "groupBy, two aggs, namespace dim, postAggs",
-    "query": {
-      "queryType": "groupBy",
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      },
-      "intervals": ["2013-08-31/2013-09-01"],
-      "granularity": "all",
-      "aggregations": [
-        {
-          "type": "count",
-          "name": "rows"
-        },
-        {
-          "type": "longSum",
-          "fieldName": "count",
-          "name": "count"
-        }
-      ],
-      "postAggregations": [
-        {
-          "type": "arithmetic",
-          "name": "sumOfRowsAndCount",
-          "fn": "+",
-          "fields": [
-            {
-              "type": "fieldAccess",
-              "name": "rows",
-              "fieldName": "rows"
-            },
-            {
-              "type": "fieldAccess",
-              "name": "count",
-              "fieldName": "count"
-            }
-          ]
-        }
-      ],
-      "dimensions": ["namespace"],
-      "context": {
-        "useCache": "true",
-        "populateCache": "true",
-        "timeout": 60000
-      }
-    },
-    "expectedResults": [
-      {
-        "version": "v1",
-        "timestamp": "2013-08-31T00:00:00.000Z",
-        "event": {
-          "sumOfRowsAndCount": 12.0,
-          "count": 6,
-          "rows": 6,
-          "namespace": "article"
-        }
-      },
-      {
-        "version": "v1",
-        "timestamp": "2013-08-31T00:00:00.000Z",
-        "event": {
-          "sumOfRowsAndCount": 18.0,
-          "count": 9,
-          "rows": 9,
-          "namespace": "wikipedia"
-        }
-      }
-    ]
-  },
-  {
-    "description": "groupBy, two aggs, namespace + robot dim, postAggs",
-    "query": {
-      "queryType": "groupBy",
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      },
-      "intervals": ["2013-08-31/2013-09-01"],
-      "granularity": "all",
-      "aggregations": [
-        {
-          "type": "count",
-          "name": "rows"
-        },
-        {
-          "type": "longSum",
-          "fieldName": "count",
-          "name": "count"
-        }
-      ],
-      "postAggregations": [
-        {
-          "type": "arithmetic",
-          "name": "sumOfRowsAndCount",
-          "fn": "+",
-          "fields": [
-            {
-              "type": "fieldAccess",
-              "name": "rows",
-              "fieldName": "rows"
-            },
-            {
-              "type": "fieldAccess",
-              "name": "count",
-              "fieldName": "count"
-            }
-          ]
-        }
-      ],
-      "dimensions": ["namespace", "robot"],
-      "limitSpec": {
-        "type": "default",
-        "limit": 3,
-        "orderBy": ["robot", "namespace"]
-      },
-      "context": {
-        "useCache": "true",
-        "populateCache": "true",
-        "timeout": 60000
-      }
-    },
-    "expectedResults": [
-      {
-        "version": "v1",
-        "timestamp": "2013-08-31T00:00:00.000Z",
-        "event": {
-          "sumOfRowsAndCount": 6.0,
-          "count": 3,
-          "robot": "false",
-          "rows": 3,
-          "namespace": "article"
-        }
-      },
-      {
-        "version": "v1",
-        "timestamp": "2013-08-31T00:00:00.000Z",
-        "event": {
-          "sumOfRowsAndCount": 6.0,
-          "count": 3,
-          "robot": "true",
-          "rows": 3,
-          "namespace": "article"
-        }
-      },
-      {
-        "version": "v1",
-        "timestamp": "2013-08-31T00:00:00.000Z",
-        "event": {
-          "sumOfRowsAndCount": 18.0,
-          "count": 9,
-          "robot": "true",
-          "rows": 9,
-          "namespace": "wikipedia"
-        }
-      }
-    ]
-  },
-  {
-    "query": {
-      "queryType": "search",
-      "intervals": ["2013-08-31/2013-09-01"],
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      },
-      "granularity": "all",
-      "query": {
-        "type": "insensitive_contains",
-        "value": "ip"
-      },
-      "context": {
-        "useCache": "true",
-        "populateCache": "true",
-        "timeout": 60000
-      }
-    },
-    "expectedResults": [
-      {
-        "timestamp": "2013-08-31T00:00:00.000Z",
-        "result": [
-          {
-            "dimension": "user",
-            "value": "triplets",
-            "count":3
-          },
-          {
-            "dimension": "namespace",
-            "value": "wikipedia",
-            "count":9
-          }
-        ]
-      }
-    ]
-  },
-  {
-    "description": "timeboundary, 1 agg, union",
-    "query": {
-      "queryType": "timeBoundary",
-      "dataSource": {
-        "type": "union",
-        "dataSources": [
-          "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
-          "%%DATASOURCE%%-0"
-        ]
-      }
-    },
-    "expectedResults": [
-      {
-        "timestamp": "2013-08-31T01:02:33.000Z",
-        "result": {
-          "minTime": "2013-08-31T01:02:33.000Z",
-          "maxTime": "2013-09-01T12:41:27.000Z"
-        }
-      }
-    ]
-  }
-]
diff --git a/integration-tests-ex/image/docker-build.sh 
b/integration-tests-ex/image/docker-build.sh
index 4ff5d7a7438..006ffaf092d 100755
--- a/integration-tests-ex/image/docker-build.sh
+++ b/integration-tests-ex/image/docker-build.sh
@@ -55,21 +55,3 @@ docker build -t $DRUID_IT_IMAGE_NAME \
        --build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
        --build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
        .
-
-if [[ -z "${BACKWARD_COMPATIBILITY_IT_ENABLED:-""}" || 
$BACKWARD_COMPATIBILITY_IT_ENABLED != "true" ]]; then
-  echo "Not building previous version image."
-  exit 0
-fi
-
-# Download the previous druid tar
-curl -L $DRUID_PREVIOUS_VERSION_DOWNLOAD_URL --output 
apache-druid-$DRUID_PREVIOUS_VERSION-bin.tar.gz
-
-docker build -t $DRUID_PREVIOUS_IT_IMAGE_NAME \
-       --build-arg DRUID_VERSION=$DRUID_PREVIOUS_VERSION \
-       --build-arg MYSQL_VERSION=$MYSQL_VERSION \
-       --build-arg MARIADB_VERSION=$MARIADB_VERSION \
-       --build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
-       --build-arg HADOOP_VERSION=$HADOOP_VERSION \
-       --build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
-       --build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
-       .
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index e04fe7b7c67..d7279e60347 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -141,6 +141,7 @@ public class MSQWorkerTask extends AbstractTask
   @Override
   public TaskStatus runTask(final TaskToolbox toolbox)
   {
+    emitMetric(toolbox.getEmitter(), "ingest/count", 1);
     try (final IndexerWorkerContext context = 
IndexerWorkerContext.createProductionInstance(this, toolbox, injector)) {
       // We'll run the worker in a separate thread, so we can interrupt the 
thread on shutdown or controller failure.
       final ListeningExecutorService workerExec =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to