This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1734a9b9d5f Move MSQ ITs to embedded tests (#18465)
1734a9b9d5f is described below
commit 1734a9b9d5f2d4e188e00e43e22c972810e7dd2b
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Sep 3 18:26:33 2025 +0530
Move MSQ ITs to embedded tests (#18465)
Changes:
- Remove revised IT categories `BackwardCompatibilityMain`,
`MultiStageQueryWithMM` and `Query`
- Remove all code for backward compatibility IT as it is already migrated to
`IngestionBackwardCompatibilityDockerTest`
- Move `ITMultiStageQueryWorkerFaultToleranceTest` to
`MSQWorkerFaultToleranceTest`
- Move `ITMultiStageQuery` to `MultiStageQueryTest`
- Move `ITKeyStatisticsSketchMergeMode` to
`MSQKeyStatisticsSketchMergeModeTest`
- Make changes to `ClusterTestingModule`, `FaultyOverlordClient` to allow
fault tolerance testing
---
.github/workflows/reusable-revised-its.yml | 46 --
.github/workflows/revised-its.yml | 41 +-
.../unit-and-integration-tests-unified.yml | 48 --
.../testing/embedded/auth/BasicAuthMSQTest.java | 89 +---
.../testing/embedded/indexing/MoreResources.java | 66 +++
.../testing/embedded/msq/EmbeddedMSQApis.java | 13 +-
.../msq/MSQKeyStatisticsSketchMergeModeTest.java | 189 +++++++
.../embedded/msq/MSQLocalDurableStorage.java | 52 ++
.../embedded/msq/MSQWorkerFaultToleranceTest.java | 135 +++++
.../testing/embedded/msq/MultiStageQueryTest.java | 154 ++++++
.../testing/embedded/query/UnionQueryTest.java | 240 +++++++++
.../apache/druid/guice/ClusterTestingModule.java | 25 +-
.../testing/cluster/ClusterTestingTaskConfig.java | 20 +-
.../testing/cluster/task/FaultyOverlordClient.java | 87 +++-
.../druid/guice/ClusterTestingModuleTest.java | 24 +-
.../overlord/hrtr/HttpRemoteTaskRunner.java | 14 +-
.../druid/indexing/overlord/hrtr/WorkerHolder.java | 3 +
.../indexing/overlord/hrtr/WorkerHolderTest.java | 15 +-
integration-tests-ex/cases/pom.xml | 32 --
.../testsEx/categories/MultiStageQueryWithMM.java | 24 -
.../org/apache/druid/testsEx/categories/Query.java | 24 -
.../msq/ITKeyStatisticsSketchMergeMode.java | 268 ----------
.../druid/testsEx/msq/ITMultiStageQuery.java | 30 --
.../msq/ITMultiStageQueryWorkerFaultTolerance.java | 173 -------
.../apache/druid/testsEx/msq/MultiStageQuery.java | 266 ----------
.../druid/testsEx/query/ITUnionQueryTest.java | 31 --
.../apache/druid/testsEx/query/UnionQueryTest.java | 207 --------
.../cases/src/test/resources/query/union_data.json | 10 -
.../query/union_kafka_supervisor_template.json | 69 ---
.../src/test/resources/query/union_queries.json | 566 ---------------------
integration-tests-ex/image/docker-build.sh | 18 -
.../apache/druid/msq/indexing/MSQWorkerTask.java | 1 +
32 files changed, 1013 insertions(+), 1967 deletions(-)
diff --git a/.github/workflows/reusable-revised-its.yml
b/.github/workflows/reusable-revised-its.yml
index 568cca04352..12a3e7e11fc 100644
--- a/.github/workflows/reusable-revised-its.yml
+++ b/.github/workflows/reusable-revised-its.yml
@@ -57,19 +57,6 @@ on:
AWS_SECRET_ACCESS_KEY:
required: false
type: string
- BACKWARD_COMPATIBILITY_IT_ENABLED:
- required: false
- type: string
- default: false
- DRUID_PREVIOUS_VERSION:
- required: false
- type: string
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
- required: false
- type: string
- DRUID_PREVIOUS_IT_IMAGE_NAME:
- required: false
- type: string
env:
MYSQL_DRIVER_CLASSNAME: ${{ inputs.mysql_driver }} # Used by tests to
connect to metadata store directly.
@@ -119,15 +106,6 @@ jobs:
./druid-container-jdk${{ inputs.build_jdk }}.tar.gz
./integration-tests-ex/image/target/env.sh
- - name: Retrieve previous version cached docker image
- id: docker-restore-previous-version
- if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
- uses: actions/cache/restore@v4
- with:
- key: druid-container-jdk${{ inputs.build_jdk }}-version${{
inputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
- path: |
- ./druid-container-jdk${{ inputs.build_jdk }}-version${{
inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
-
- name: Maven build
if: steps.maven-restore.outputs.cache-hit != 'true' || (
steps.docker-restore.outputs.cache-hit != 'true' &&
steps.targets-restore.outputs.cache-hit != 'true' )
run: |
@@ -137,10 +115,6 @@ jobs:
if: steps.docker-restore.outputs.cache-hit != 'true' ||
steps.maven-restore.outputs.cache-hit != 'true'
env:
docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
- BACKWARD_COMPATIBILITY_IT_ENABLED: ${{
inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
- DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{
inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
- DRUID_PREVIOUS_IT_IMAGE_NAME: ${{
inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
./it.sh image
source ./integration-tests-ex/image/target/env.sh
@@ -148,15 +122,6 @@ jobs:
echo $DRUID_IT_IMAGE_NAME
docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{
inputs.build_jdk }}.tar.gz
- - name: Save previous version docker image
- if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' &&
(steps.docker-restore.outputs.cache-hit != 'true' ||
steps.maven-restore.outputs.cache-hit != 'true') }}
- env:
- docker-restore: ${{ toJson(steps.docker-restore.outputs) }}
- run: |
- docker tag ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{
inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ inputs.build_jdk }}-version${{
inputs.DRUID_PREVIOUS_VERSION }}
- echo ${DRUID_PREVIOUS_IT_IMAGE_NAME}
- docker save "${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip >
druid-container-jdk${{ inputs.build_jdk }}-version${{
inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
-
- name: Stop and remove docker containers
run: |
echo "Force stopping all containers and pruning"
@@ -168,20 +133,9 @@ jobs:
docker load --input druid-container-jdk${{ inputs.build_jdk }}.tar.gz
docker images
- - name: Load previous version docker image
- if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' }}
- run: |
- docker load --input druid-container-jdk${{ inputs.build_jdk
}}-version${{ inputs.DRUID_PREVIOUS_VERSION }}.tar.gz
- docker images
-
- name: Run IT
id: run-it
timeout-minutes: 60
- env:
- BACKWARD_COMPATIBILITY_IT_ENABLED: ${{
inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
- DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{
inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
- DRUID_PREVIOUS_IT_IMAGE_NAME: ${{
inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
${{ inputs.script }}
diff --git a/.github/workflows/revised-its.yml
b/.github/workflows/revised-its.yml
index 7b71a4a9820..c662cb47d8f 100644
--- a/.github/workflows/revised-its.yml
+++ b/.github/workflows/revised-its.yml
@@ -18,24 +18,6 @@
name: "Revised ITs workflow"
on:
workflow_call:
- inputs:
- BACKWARD_COMPATIBILITY_IT_ENABLED:
- description: "Flag for backward compatibility IT"
- required: false
- default: false
- type: string
- DRUID_PREVIOUS_VERSION:
- description: "Previous druid versions to run the test against."
- required: false
- type: string
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL:
- description: "URL to download the previous druid version."
- required: false
- type: string
- DRUID_PREVIOUS_IT_IMAGE_NAME:
- description: "Druid previous version image name."
- required: false
- type: string
workflow_dispatch:
jobs:
@@ -67,7 +49,7 @@ jobs:
fail-fast: false
matrix:
jdk: [17]
- it: [MultiStageQuery, BatchIndex, MultiStageQueryWithMM, InputSource,
InputFormat, Query, DruidExactCountBitmap]
+ it: [MultiStageQuery, BatchIndex, InputSource, InputFormat,
DruidExactCountBitmap]
indexer: [middleManager]
uses: ./.github/workflows/reusable-revised-its.yml
if: ${{ needs.changes.outputs.core == 'true' ||
needs.changes.outputs.common-extensions == 'true' }}
@@ -95,24 +77,3 @@ jobs:
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: miniopassword
-
- backward-compatibility-it:
- needs: changes
- uses: ./.github/workflows/reusable-revised-its.yml
- if: ${{ inputs.BACKWARD_COMPATIBILITY_IT_ENABLED == 'true' &&
(needs.changes.outputs.core == 'true' ||
needs.changes.outputs.common-extensions == 'true') }}
- with:
- build_jdk: 17
- runtime_jdk: 17
- use_indexer: middleManager
- script: ./it.sh github BackwardCompatibilityMain
- it: BackwardCompatibilityMain
- mysql_driver: com.mysql.jdbc.Driver
- BACKWARD_COMPATIBILITY_IT_ENABLED: ${{
inputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
- DRUID_PREVIOUS_VERSION: ${{ inputs.DRUID_PREVIOUS_VERSION }}
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{
inputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
- DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ inputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
- DRUID_CLOUD_BUCKET: druid-qa
- DRUID_CLOUD_PATH: aws-${{ github.run_id }}-${{ github.run_attempt }}
- AWS_REGION: us-east-1
- AWS_ACCESS_KEY_ID: admin
- AWS_SECRET_ACCESS_KEY: miniopassword
diff --git a/.github/workflows/unit-and-integration-tests-unified.yml
b/.github/workflows/unit-and-integration-tests-unified.yml
index 95648f0087f..9f073646635 100644
--- a/.github/workflows/unit-and-integration-tests-unified.yml
+++ b/.github/workflows/unit-and-integration-tests-unified.yml
@@ -51,30 +51,7 @@ env:
SEGMENT_DOWNLOAD_TIMEOUT_MINS: 5
jobs:
- set-env-var:
- name: Set env var
- runs-on: ubuntu-latest
- outputs:
- DRUID_PREVIOUS_IT_IMAGE_NAME: ${{ steps.image_name.outputs.image_name }}
- BACKWARD_COMPATIBILITY_IT_ENABLED: ${{ steps.it_enabled.outputs.enabled
}}
- DRUID_PREVIOUS_VERSION: ${{ env.DRUID_PREVIOUS_VERSION }}
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{
env.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
- steps:
- - name: Set image name env var
- id: image_name
- run: |
- echo "::set-output
name=image_name::org.apache.druid.integration-tests/test:${{
env.DRUID_PREVIOUS_VERSION }}"
- - name: Set env for enabling backward compatibility it
- id: it_enabled
- run: |
- if [ -n "${{ env.DRUID_PREVIOUS_VERSION }}" ]; then
- echo "::set-output name=enabled::true"
- else
- echo "::set-output name=enabled::false"
- fi
-
build:
- needs: set-env-var
name: "build (jdk${{ matrix.jdk }})"
strategy:
fail-fast: false
@@ -120,25 +97,12 @@ jobs:
./druid-container-jdk${{ matrix.jdk }}.tar.gz
./integration-tests-ex/image/target/env.sh
- - name: Cache previous version image
- id: docker_container_previous_version
- uses: actions/cache@v4
- with:
- key: druid-container-jdk${{ matrix.jdk }}-version${{
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz-${{ github.sha }}
- path: |
- ./druid-container-jdk${{ matrix.jdk }}-version${{
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz
-
- name: Maven build
id: maven_build
run: |
./it.sh ci
- name: Container build
- env:
- BACKWARD_COMPATIBILITY_IT_ENABLED: ${{
needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
- DRUID_PREVIOUS_VERSION: ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
- DRUID_PREVIOUS_IT_IMAGE_NAME: ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
run: |
./it.sh image
source ./integration-tests-ex/image/target/env.sh
@@ -150,13 +114,6 @@ jobs:
echo $DRUID_IT_IMAGE_NAME
docker save "$DRUID_IT_IMAGE_NAME" | gzip > druid-container-jdk${{
matrix.jdk }}.tar.gz
- - name: Save previous version docker image
- if: ${{ needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED ==
'true' }}
- run: |
- docker tag ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }} ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}-jdk${{ matrix.jdk
}}-version${{ needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
- echo ${{ needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
- docker save "${{
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}" | gzip >
druid-container-jdk${{ matrix.jdk }}-version${{
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}.tar.gz
-
unit-tests:
name: "unit tests"
uses: ./.github/workflows/ci.yml
@@ -168,11 +125,6 @@ jobs:
revised-its:
needs: [build, unit-tests]
uses: ./.github/workflows/revised-its.yml
- with:
- BACKWARD_COMPATIBILITY_IT_ENABLED: ${{
needs.set-env-var.outputs.BACKWARD_COMPATIBILITY_IT_ENABLED }}
- DRUID_PREVIOUS_VERSION: ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION }}
- DRUID_PREVIOUS_VERSION_DOWNLOAD_URL: ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_VERSION_DOWNLOAD_URL }}
- DRUID_PREVIOUS_IT_IMAGE_NAME: ${{
needs.set-env-var.outputs.DRUID_PREVIOUS_IT_IMAGE_NAME }}
docker-tests:
needs: [build, unit-tests]
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
index 4a636c3c588..87012ce2c7b 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthMSQTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.EmbeddedServiceClient;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.indexing.Resources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.testing.embedded.msq.MSQExportDirectory;
@@ -123,45 +124,11 @@ public class BasicAuthMSQTest extends
EmbeddedClusterTestBase
List<ResourceAction> permissions = ImmutableList.of();
securityClient.setPermissionsToRole(ROLE_1, permissions);
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + " '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n"
- + "PARTITIONED BY DAY\n",
- dataSource,
- Resources.DataFile.tinyWiki1Json().getAbsolutePath()
- );
+ String queryLocal = StringUtils.format(
+ MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+ );
verifySqlSubmitFailsWith403Forbidden(queryLocal);
}
@@ -177,45 +144,11 @@ public class BasicAuthMSQTest extends
EmbeddedClusterTestBase
);
securityClient.setPermissionsToRole(ROLE_1, permissions);
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + " '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n"
- + "PARTITIONED BY DAY\n",
- dataSource,
- Resources.DataFile.tinyWiki1Json().getAbsolutePath()
- );
+ String queryLocal = StringUtils.format(
+ MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json()
+ );
final SqlTaskStatus taskStatus = userClient.onAnyBroker(
b -> b.submitSqlTask(
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
index b50d4f2e601..205271bcfcf 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
@@ -86,6 +86,72 @@ public class MoreResources
.appendToExisting(false);
}
+ public static class MSQ
+ {
+ /**
+ * SQL to INSERT any of the tiny wiki JSON data files into a new datasource
+ * with DAY granularity. e.g. {@link Resources.DataFile#tinyWiki1Json()}.
+ */
+ public static final String INSERT_TINY_WIKI_JSON =
+ "INSERT INTO %s\n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"timestamp\") AS __time,\n"
+ + " isRobot,\n"
+ + " diffUrl,\n"
+ + " added,\n"
+ + " countryIsoCode,\n"
+ + " regionName,\n"
+ + " channel,\n"
+ + " flags,\n"
+ + " delta,\n"
+ + " isUnpatrolled,\n"
+ + " isNew,\n"
+ + " deltaBucket,\n"
+ + " isMinor,\n"
+ + " isAnonymous,\n"
+ + " deleted,\n"
+ + " cityName,\n"
+ + " metroCode,\n"
+ + " namespace,\n"
+ + " comment,\n"
+ + " page,\n"
+ + " commentLength,\n"
+ + " countryName,\n"
+ + " user,\n"
+ + " regionIsoCode\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
+ + " '{\"type\":\"json\"}',\n"
+ + " "
+ + "'[{\"type\":\"string\",\"name\":\"timestamp\"},"
+ + "{\"type\":\"string\",\"name\":\"isRobot\"},"
+ + "{\"type\":\"string\",\"name\":\"diffUrl\"},"
+ + "{\"type\":\"long\",\"name\":\"added\"},"
+ + "{\"type\":\"string\",\"name\":\"countryIsoCode\"},"
+ + "{\"type\":\"string\",\"name\":\"regionName\"},"
+ + "{\"type\":\"string\",\"name\":\"channel\"},"
+ + "{\"type\":\"string\",\"name\":\"flags\"},"
+ + "{\"type\":\"long\",\"name\":\"delta\"},"
+ + "{\"type\":\"string\",\"name\":\"isUnpatrolled\"},"
+ + "{\"type\":\"string\",\"name\":\"isNew\"},"
+ + "{\"type\":\"double\",\"name\":\"deltaBucket\"},"
+ + "{\"type\":\"string\",\"name\":\"isMinor\"},"
+ + "{\"type\":\"string\",\"name\":\"isAnonymous\"},"
+ + "{\"type\":\"long\",\"name\":\"deleted\"},"
+ + "{\"type\":\"string\",\"name\":\"cityName\"},"
+ + "{\"type\":\"long\",\"name\":\"metroCode\"},"
+ +
"{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},"
+ + "{\"type\":\"string\",\"name\":\"page\"},"
+ + "{\"type\":\"long\",\"name\":\"commentLength\"},"
+ + "{\"type\":\"string\",\"name\":\"countryName\"},"
+ + "{\"type\":\"string\",\"name\":\"user\"},"
+ + "{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n"
+ + " )\n"
+ + ")\n"
+ + "PARTITIONED BY DAY\n";
+ }
+
/**
* Supervisor spec builder.
*/
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
index 71967443d40..d656618db3d 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
@@ -83,6 +83,17 @@ public class EmbeddedMSQApis
* @return The result of the SQL as a single CSV string.
*/
public SqlTaskStatus submitTaskSql(String sql, Object... args)
+ {
+ return submitTaskSql(null, sql, args);
+ }
+
+ /**
+ * Submits the given SQL query to any of the brokers (using {@code
BrokerClient})
+ * of the cluster, checks that the task has started and returns the {@link
SqlTaskStatus}.
+ *
+ * @return The result of the SQL as a single CSV string.
+ */
+ public SqlTaskStatus submitTaskSql(Map<String, Object> queryContext, String
sql, Object... args)
{
final SqlTaskStatus taskStatus =
cluster.callApi().onAnyBroker(
@@ -93,7 +104,7 @@ public class EmbeddedMSQApis
false,
false,
false,
- null,
+ queryContext,
null
)
)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQKeyStatisticsSketchMergeModeTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQKeyStatisticsSketchMergeModeTest.java
new file mode 100644
index 00000000000..c1d52f9dc7b
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQKeyStatisticsSketchMergeModeTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class MSQKeyStatisticsSketchMergeModeTest extends
EmbeddedClusterTestBase
+{
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+ .setServerMemory(300_000_000L)
+ .addProperty("druid.worker.capacity", "3");
+ private EmbeddedMSQApis msqApis;
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(indexer)
+ .addServer(new EmbeddedBroker())
+ .addServer(new EmbeddedHistorical())
+ .addServer(new EmbeddedRouter());
+ }
+
+ @BeforeAll
+ public void initTestClient()
+ {
+ msqApis = new EmbeddedMSQApis(cluster, overlord);
+ }
+
+ @Test
+ public void testMsqIngestionParallelMerging()
+ {
+ String queryLocal = StringUtils.format(
+ MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json()
+ );
+
+ Map<String, Object> context = Map.of(
+ MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+ ClusterStatisticsMergeMode.PARALLEL
+ );
+
+ final SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context,
queryLocal);
+ cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(),
overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+ cluster.callApi().verifySqlQuery(
+ "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+ dataSource,
+ "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+ + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+ + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+ );
+ }
+
+ @Test
+ public void testMsqIngestionSequentialMerging()
+ {
+ String queryLocal = StringUtils.format(
+ MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json()
+ );
+
+ Map<String, Object> context = Map.of(
+ MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+ ClusterStatisticsMergeMode.SEQUENTIAL
+ );
+
+ SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, queryLocal);
+ cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(),
overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+ cluster.callApi().verifySqlQuery(
+ "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+ dataSource,
+ "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+ + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+ + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+ );
+ }
+
+
+ @Test
+ public void testMsqIngestionSequentialMergingWithEmptyStatistics()
+ {
+ String queryLocal =
+ StringUtils.format(
+ "Replace INTO %s overwrite ALL \n"
+ + "SELECT\n"
+ + " TIME_PARSE(\"timestamp\") AS __time,\n"
+ + " isRobot,\n"
+ + " diffUrl,\n"
+ + " added,\n"
+ + " countryIsoCode,\n"
+ + " regionName,\n"
+ + " channel,\n"
+ + " flags,\n"
+ + " delta,\n"
+ + " isUnpatrolled,\n"
+ + " isNew,\n"
+ + " deltaBucket,\n"
+ + " isMinor,\n"
+ + " isAnonymous,\n"
+ + " deleted,\n"
+ + " cityName,\n"
+ + " metroCode,\n"
+ + " namespace,\n"
+ + " comment,\n"
+ + " page,\n"
+ + " commentLength,\n"
+ + " countryName,\n"
+ + " user,\n"
+ + " regionIsoCode\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"local\",\"files\":[\"%s\",\"%s\"]}',\n"
+ + " '{\"type\":\"json\"}',\n"
+ + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
+ + " )\n"
+ + ")\n"
+ + "where delta=111 "
+ // we add this filter since delta=111 is only present in
wikipedia_index_data1.json. This means partitions from worker 2 will be empty.
+ + "PARTITIONED BY DAY\n"
+ + "CLUSTERED BY \"__time\"",
+ dataSource,
+ Resources.DataFile.tinyWiki1Json(),
+ Resources.DataFile.tinyWiki2Json()
+ );
+
+ Map<String, Object> context = Map.of(
+ MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+ ClusterStatisticsMergeMode.SEQUENTIAL,
+ MultiStageQueryContext.CTX_MAX_NUM_TASKS,
+ 3
+ );
+
+ SqlTaskStatus sqlTaskStatus = msqApis.submitTaskSql(context, queryLocal);
+ cluster.callApi().waitForTaskToSucceed(sqlTaskStatus.getTaskId(),
overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+ cluster.callApi().verifySqlQuery(
+ "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+ dataSource,
+ "2013-08-31T07:11:21.000Z,,123,111,12,article"
+ );
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQLocalDurableStorage.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQLocalDurableStorage.java
new file mode 100644
index 00000000000..7d1fa02d60d
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQLocalDurableStorage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedResource;
+
+import java.io.File;
+
+/**
+ * Embedded resource to use durable local storage for MSQ tasks.
+ */
+public class MSQLocalDurableStorage implements EmbeddedResource
+{
+ @Override
+ public void start() throws Exception
+ {
+ // do nothing
+ }
+
+ @Override
+ public void onStarted(EmbeddedDruidCluster cluster)
+ {
+ final File baseDir =
cluster.getTestFolder().getOrCreateFolder("msq-durable-storage");
+ cluster.addCommonProperty("druid.msq.intermediate.storage.enable", "true")
+ .addCommonProperty("druid.msq.intermediate.storage.type", "local")
+ .addCommonProperty("druid.msq.intermediate.storage.basePath",
baseDir.getAbsolutePath());
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ // do nothing
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
new file mode 100644
index 00000000000..4d1299adc88
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MSQWorkerFaultToleranceTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.guice.ClusterTestingModule;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+/**
+ * Test to verify that cancelled worker tasks are retried when fault tolerance
+ * is enabled. This test uses the {@link ClusterTestingModule} to create a
+ * faulty Indexer which blocks the completion of the worker task. This allows
+ * time to kill off the worker before it can finish, thus triggering a
relaunch.
+ */
+public class MSQWorkerFaultToleranceTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+ .addProperty("druid.plaintextPort", "7091")
+ .addProperty("druid.worker.capacity", "1");
+
+ private EmbeddedMSQApis msqApis;
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addExtension(ClusterTestingModule.class)
+ .addResource(new MSQLocalDurableStorage())
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(indexer)
+ .addServer(new EmbeddedBroker())
+ .addServer(new EmbeddedHistorical());
+ }
+
+ @BeforeAll
+ public void initTestClient()
+ {
+ msqApis = new EmbeddedMSQApis(cluster, overlord);
+ }
+
+ @Test
+ public void test_cancelledWorker_isRetried_ifFaultToleranceIsEnabled()
throws Exception
+ {
+ final String queryLocal = StringUtils.format(
+ MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+ );
+
+ // Run the MSQ task in fault tolerance mode
+ final SqlTaskStatus taskStatus = msqApis.submitTaskSql(
+ Map.of("faultTolerance", true),
+ queryLocal
+ );
+
+ // Add a faulty Indexer to the cluster so that worker is launched but
doesn't finish
+ final EmbeddedIndexer faultyIndexer = new EmbeddedIndexer()
+ .addProperty("druid.unsafe.cluster.testing", "true")
+
.addProperty("druid.unsafe.cluster.testing.overlordClient.taskStatusDelay",
"PT1H")
+ .addProperty("druid.worker.capacity", "1");
+ cluster.addServer(faultyIndexer);
+ faultyIndexer.start();
+
+ // Let the worker run for a bit so that controller task moves to
READING_INPUT phase
+ final ServiceMetricEvent matchingEvent =
faultyIndexer.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("ingest/count")
+ );
+ final String workerTaskId = (String)
matchingEvent.getUserDims().get(DruidMetrics.TASK_ID);
+ Thread.sleep(100);
+
+ // Cancel the worker task and verify that it has failed
+ cluster.callApi().onLeaderOverlord(o -> o.cancelTask(workerTaskId));
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ .hasDimension(DruidMetrics.TASK_STATUS, "FAILED")
+ );
+ faultyIndexer.stop();
+
+ // Add a functional Indexer so that the worker is relaunched successfully
+ final EmbeddedIndexer functionalIndexer = new EmbeddedIndexer()
+ .addProperty("druid.worker.capacity", "1");
+ cluster.addServer(functionalIndexer);
+ functionalIndexer.start();
+
+ // Verify that the controller task eventually succeeds
+ cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(),
overlord.latchableEmitter());
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+ cluster.callApi().verifySqlQuery(
+ "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+ dataSource,
+ "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+ + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+ + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+ );
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
new file mode 100644
index 00000000000..e12e3518798
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultiStageQueryTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+ .setServerMemory(300_000_000L)
+ .addProperty("druid.worker.capacity", "2");
+ private final MSQExportDirectory exportDirectory = new MSQExportDirectory();
+
+ private EmbeddedMSQApis msqApis;
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addResource(exportDirectory)
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(indexer)
+ .addServer(new EmbeddedBroker())
+ .addServer(new EmbeddedHistorical());
+ }
+
+ @BeforeAll
+ public void initTestClient()
+ {
+ msqApis = new EmbeddedMSQApis(cluster, overlord);
+ }
+
+ @Test
+ public void testMsqIngestionAndQuerying()
+ {
+ final String sql = StringUtils.format(
+ MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+ );
+
+ final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
+ cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(),
overlord.latchableEmitter());
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+
+ cluster.callApi().verifySqlQuery(
+ "SELECT __time, isRobot, added, delta, deleted, namespace FROM %s",
+ dataSource,
+ "2013-08-31T01:02:33.000Z,,57,-143,200,article\n"
+ + "2013-08-31T03:32:45.000Z,,459,330,129,wikipedia\n"
+ + "2013-08-31T07:11:21.000Z,,123,111,12,article"
+ );
+ }
+
+ @Test
+ public void testExport()
+ {
+ final String exportSql =
+ StringUtils.format(
+ "INSERT INTO extern(local(exportPath => '%s'))\n"
+ + "AS CSV\n"
+ + "SELECT page, added, delta\n"
+ + "FROM TABLE(\n"
+ + " EXTERN(\n"
+ + " '{\"type\":\"local\",\"files\":[\"%s\"]}',\n"
+ + " '{\"type\":\"json\"}',\n"
+ + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
+ + " )\n"
+ + ")\n",
+ new File(exportDirectory.get(), dataSource).getAbsolutePath(),
+ Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+ );
+
+ final SqlTaskStatus taskStatus = msqApis.submitTaskSql(exportSql);
+ cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(),
overlord.latchableEmitter());
+
+ final String selectSql = StringUtils.format(
+ "SELECT page, delta, added\n"
+ + " FROM TABLE(\n"
+ + " EXTERN(\n"
+ + "
'{\"type\":\"local\",\"baseDir\":\"%s\",\"filter\":\"*.csv\"}',\n"
+ + " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
+ + " )\n"
+ + " ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
+ + " WHERE delta != 0\n"
+ + " ORDER BY page",
+ exportDirectory.get()
+ );
+
+ final MSQTaskReportPayload statusReport =
msqApis.runTaskSqlAndGetReport(selectSql);
+ Assertions.assertNotNull(statusReport);
+ Assertions.assertNotNull(statusReport.getResults());
+
+ MSQResultsReport resultsReport = statusReport.getResults();
+
+ List<List<Object>> actualResults = new ArrayList<>();
+ for (final Object[] row : resultsReport.getResults()) {
+ actualResults.add(Arrays.asList(row));
+ }
+
+ List<List<Object>> expectedResults = List.of(
+ List.of("Cherno Alpha", 111, 123),
+ List.of("Gypsy Danger", -143, 57),
+ List.of("Striker Eureka", 330, 459)
+ );
+
+ Assertions.assertEquals(
+ expectedResults,
+ actualResults
+ );
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/UnionQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/UnionQueryTest.java
new file mode 100644
index 00000000000..05a47e8f2d6
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/UnionQueryTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule;
+import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchModule;
+import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
+import org.apache.druid.query.search.InsensitiveContainsSearchQuerySpec;
+import org.apache.druid.query.topn.LexicographicTopNMetricSpec;
+import org.apache.druid.query.topn.TopNQueryBuilder;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class UnionQueryTest extends EmbeddedClusterTestBase
+{
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addExtensions(SketchModule.class, HllSketchModule.class,
DoublesSketchModule.class)
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(new EmbeddedIndexer())
+ .addServer(new EmbeddedBroker())
+ .addServer(new EmbeddedHistorical());
+ }
+
+ @Test
+ public void test_ingestData_andVerifyNativeAndSQLQueries()
+ {
+ final int numDatasources = 3;
+
+ final List<String> datasourceNames = IntStream
+ .range(0, numDatasources)
+ .mapToObj(i -> EmbeddedClusterApis.createTestDatasourceName())
+ .collect(Collectors.toList());
+
+ for (String datasourceName : datasourceNames) {
+ final Task task = MoreResources.Task.INDEX_TASK_WITH_AGGREGATORS
+ .get()
+ .dataSource(datasourceName)
+ .withId(IdUtils.getRandomId());
+ cluster.callApi().runTask(task, overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(datasourceName,
coordinator);
+ }
+
+ // Verify some native queries
+ final DataSource unionDatasource = new UnionDataSource(
+ datasourceNames.stream()
+ .map(TableDataSource::new)
+ .collect(Collectors.toList())
+ );
+
+ // Timeseries query
+ verifyQuery(
+ Druids
+ .newTimeseriesQueryBuilder()
+ .intervals("2013-08-31/2013-09-01")
+ .dataSource(unionDatasource)
+ .filters("language", "en")
+ .aggregators(
+ new CountAggregatorFactory("rows"),
+ new LongSumAggregatorFactory("count", "ingested_events"),
+ new DoubleSumAggregatorFactory("added", "added"),
+ new DoubleSumAggregatorFactory("deleted", "deleted"),
+ new DoubleSumAggregatorFactory("delta", "delta")
+ )
+ .build(),
+ List.of(
+ Map.of(
+ "timestamp", "2013-08-31T01:02:33.000Z",
+ "result", Map.of("count", 6, "delta", 561.0, "deleted", 987.0,
"rows", 6, "added", 1548.0)
+ )
+ )
+ );
+
+ // Search query
+ verifyQuery(
+ Druids
+ .newSearchQueryBuilder()
+ .dataSource(unionDatasource)
+ .intervals("2013-08-31/2013-09-01")
+ .granularity(Granularities.ALL)
+ .query(new InsensitiveContainsSearchQuerySpec("ip"))
+ .build(),
+ List.of(
+ Map.of(
+ "timestamp", "2013-08-31T00:00:00.000Z",
+ "result", List.of(
+ Map.of("dimension", "user", "value", "triplets", "count",
3),
+ Map.of("dimension", "namespace", "value", "wikipedia",
"count", 9)
+ )
+ )
+ )
+ );
+
+ // TopN lexicographic
+ verifyQuery(
+ new TopNQueryBuilder()
+ .dataSource(unionDatasource)
+ .intervals("2013-08-31/2013-09-01")
+ .granularity(Granularities.ALL)
+ .aggregators(
+ new CountAggregatorFactory("rows"),
+ new LongSumAggregatorFactory("count", "ingested_events")
+ )
+ .postAggregators(
+ new ArithmeticPostAggregator(
+ "sumOfRowsAndCount",
+ "+",
+ List.of(new FieldAccessPostAggregator(null, "rows"), new
FieldAccessPostAggregator(null, "count"))
+ )
+ )
+ .dimension("language")
+ .metric(new LexicographicTopNMetricSpec("a"))
+ .threshold(3)
+ .build(),
+ List.of(
+ Map.of(
+ "timestamp", "2013-08-31T01:02:33.000Z",
+ "result",
+ List.of(
+ Map.of("sumOfRowsAndCount", 12.0, "count", 6, "language",
"en", "rows", 6),
+ Map.of("sumOfRowsAndCount", 6.0, "count", 3, "language",
"ja", "rows", 3),
+ Map.of("sumOfRowsAndCount", 6.0, "count", 3, "language",
"ru", "rows", 3)
+ )
+ )
+ )
+ );
+
+ // Verify some SQL queries
+ cluster.callApi().verifySqlQuery(
+ "SELECT page, COUNT(*), SUM(ingested_events), SUM(added),
SUM(deleted), SUM(delta) FROM (%s)"
+ + " WHERE __time >= '2013-08-31' AND __time < '2013-09-01'"
+ + " GROUP BY 1 ORDER BY 4 DESC LIMIT 3",
+ unionAll("SELECT * FROM %s", datasourceNames),
+ "Crimson Typhoon,3,3,2715.0,15.0,2700.0\n"
+ + "Striker Eureka,3,3,1377.0,387.0,990.0\n"
+ + "Cherno Alpha,3,3,369.0,36.0,333.0"
+ );
+ cluster.callApi().verifySqlQuery(
+ "SELECT MIN(__time), MAX(__time) FROM (%s)",
+ unionAll("SELECT * FROM %s", datasourceNames),
+ "2013-08-31T01:02:33.000Z,2013-09-01T12:41:27.000Z"
+ );
+ cluster.callApi().verifySqlQuery(
+ "SELECT COUNT(*), SUM(ingested_events), SUM(added), SUM(deleted),
SUM(delta) FROM (%s)"
+ + " WHERE \"language\" = 'en' AND __time >= '2013-08-31' AND __time <
'2013-09-01'",
+ unionAll("SELECT * FROM %s", datasourceNames),
+ "6,6,1548.0,987.0,561.0"
+ );
+ cluster.callApi().verifySqlQuery(
+ unionAll(
+ "SELECT COUNT(*), SUM(ingested_events), SUM(added), SUM(deleted),
SUM(delta)"
+ + " FROM %s"
+ + " WHERE \"language\" = 'en' AND __time >= '2013-08-31' AND
__time < '2013-09-01'",
+ datasourceNames
+ ),
+ null,
+ "2,2,516.0,329.0,187.0\n"
+ + "2,2,516.0,329.0,187.0\n"
+ + "2,2,516.0,329.0,187.0"
+ );
+ }
+
+ /**
+ * Creates a SQL for each of the datasources and then combines them with
{@code UNION ALL}.
+ */
+ private String unionAll(String sqlFormat, List<String> datasourceNames)
+ {
+ return datasourceNames.stream()
+ .map(ds -> StringUtils.format(sqlFormat, ds))
+ .collect(Collectors.joining("\nUNION ALL\n"));
+ }
+
+ private void verifyQuery(Query<?> query, List<Map<String, Object>>
expectedResult)
+ {
+ final String resultAsJson = cluster.callApi().onAnyBroker(b ->
b.submitNativeQuery(query));
+ final List<Map<String, Object>> resultList = JacksonUtils.readValue(
+ TestHelper.JSON_MAPPER,
+ resultAsJson.getBytes(StandardCharsets.UTF_8),
+ new TypeReference<>() {}
+ );
+ Assertions.assertEquals(expectedResult, resultList);
+ }
+}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
index 4c32caf1f6b..4c48d1ae760 100644
---
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
@@ -54,6 +54,8 @@ import java.util.Set;
public class ClusterTestingModule implements DruidModule
{
private static final Logger log = new Logger(ClusterTestingModule.class);
+ private static final String PROPERTY_ENABLE = "druid.unsafe.cluster.testing";
+ private static final String PROPERTY_OVERLORD_CLIENT_CONFIG =
"druid.unsafe.cluster.testing.overlordClient";
private Set<NodeRole> roles;
private boolean isClusterTestingEnabled = false;
@@ -65,7 +67,7 @@ public class ClusterTestingModule implements DruidModule
)
{
this.isClusterTestingEnabled = Boolean.parseBoolean(
- props.getProperty("druid.unsafe.cluster.testing", "false")
+ props.getProperty(PROPERTY_ENABLE, "false")
);
this.roles = roles;
}
@@ -75,13 +77,14 @@ public class ClusterTestingModule implements DruidModule
{
if (isClusterTestingEnabled) {
log.warn(
- "Running service in cluster testing mode. This is an unsafe
test-only"
+ "Running service with roles[%s] in cluster testing mode. This is an
unsafe test-only"
+ " mode and must never be used in a production cluster."
- + " Set property[druid.unsafe.cluster.testing=false] to disable
testing mode."
+ + " Set property[%s=false] to disable testing mode.",
+ roles, PROPERTY_ENABLE
);
bindDependenciesForClusterTestingMode(binder);
} else {
- log.warn("Cluster testing is disabled. Set
property[druid.unsafe.cluster.testing=true] to enable it.");
+ log.info("Cluster testing is disabled. Set property[%s=true] to enable
it.", PROPERTY_ENABLE);
}
}
@@ -97,18 +100,22 @@ public class ClusterTestingModule implements DruidModule
binder.bind(CoordinatorClient.class)
.to(FaultyCoordinatorClient.class)
.in(LazySingleton.class);
- binder.bind(OverlordClient.class)
- .to(FaultyOverlordClient.class)
- .in(LazySingleton.class);
binder.bind(RemoteTaskActionClientFactory.class)
.to(FaultyRemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
} else if (roles.contains(NodeRole.OVERLORD)) {
- // If this is the Overlord, bind a faulty storage coordinator
- log.warn("Running Overlord in cluster testing mode.");
binder.bind(GlobalTaskLockbox.class)
.to(FaultyTaskLockbox.class)
.in(LazySingleton.class);
+ } else if (roles.contains(NodeRole.INDEXER)) {
+ JsonConfigProvider.bind(
+ binder,
+ PROPERTY_OVERLORD_CLIENT_CONFIG,
+ ClusterTestingTaskConfig.OverlordClientConfig.class
+ );
+ binder.bind(OverlordClient.class)
+ .to(FaultyOverlordClient.class)
+ .in(LazySingleton.class);
}
}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
index c350c951dab..6a08a7bdc52 100644
---
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/ClusterTestingTaskConfig.java
@@ -112,12 +112,28 @@ public class ClusterTestingTaskConfig
*/
public static class OverlordClientConfig
{
- private static final OverlordClientConfig DEFAULT = new
OverlordClientConfig();
+ private static final OverlordClientConfig DEFAULT = new
OverlordClientConfig(null);
+
+ @JsonProperty
+ private final Duration taskStatusDelay;
+
+ @JsonCreator
+ public OverlordClientConfig(
+ @JsonProperty("taskStatusDelay") @Nullable Duration taskStatusDelay
+ )
+ {
+ this.taskStatusDelay = taskStatusDelay;
+ }
+
+ public Duration getTaskStatusDelay()
+ {
+ return taskStatusDelay;
+ }
@Override
public String toString()
{
- return "";
+ return '{' + "taskStatusDelay=" + taskStatusDelay + '}';
}
}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
index ba96840b833..77a2f26573b 100644
---
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyOverlordClient.java
@@ -20,40 +20,91 @@
package org.apache.druid.testing.cluster.task;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
-import org.apache.druid.client.coordinator.Coordinator;
-import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.rpc.ServiceClientFactory;
-import org.apache.druid.rpc.ServiceLocator;
-import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClientImpl;
import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
+import org.joda.time.Duration;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Set;
public class FaultyOverlordClient extends OverlordClientImpl
{
private static final Logger log = new Logger(FaultyOverlordClient.class);
- private final ClusterTestingTaskConfig testingConfig;
+ private final ObjectMapper jsonMapper;
+ private final ServiceClient serviceClient;
+ private final ClusterTestingTaskConfig.OverlordClientConfig testingConfig;
@Inject
public FaultyOverlordClient(
- ClusterTestingTaskConfig testingConfig,
+ ClusterTestingTaskConfig.OverlordClientConfig testingConfig,
@Json final ObjectMapper jsonMapper,
- @EscalatedGlobal final ServiceClientFactory clientFactory,
- @Coordinator final ServiceLocator serviceLocator
+ @IndexingService final ServiceClient serviceClient
)
{
- super(
- clientFactory.makeClient(
- NodeRole.COORDINATOR.getJsonName(),
- serviceLocator,
- StandardRetryPolicy.builder().maxAttempts(6).build()
- ),
- jsonMapper
- );
+ super(serviceClient, jsonMapper);
+ this.jsonMapper = jsonMapper;
+ this.serviceClient = serviceClient;
this.testingConfig = testingConfig;
+ log.info("Initialized FaultyOverlordClient with config[%s]",
testingConfig);
+ }
+
+ @Override
+ public ListenableFuture<CloseableIterator<TaskStatusPlus>> taskStatuses(
+ @Nullable String state,
+ @Nullable String dataSource,
+ @Nullable Integer maxCompletedTasks
+ )
+ {
+ addDelayIfConfigured();
+ return super.taskStatuses(state, dataSource, maxCompletedTasks);
+ }
+
+ @Override
+ public ListenableFuture<Map<String, TaskStatus>> taskStatuses(Set<String>
taskIds)
+ {
+ addDelayIfConfigured();
+ return super.taskStatuses(taskIds);
+ }
+
+ @Override
+ public ListenableFuture<TaskStatusResponse> taskStatus(String taskId)
+ {
+ addDelayIfConfigured();
+ return super.taskStatus(taskId);
+ }
+
+ @Override
+ public OverlordClientImpl withRetryPolicy(ServiceRetryPolicy retryPolicy)
+ {
+ return new FaultyOverlordClient(testingConfig, jsonMapper, serviceClient);
+ }
+
+ private void addDelayIfConfigured()
+ {
+ final Duration delay = testingConfig.getTaskStatusDelay();
+ if (delay == null) {
+ return;
+ }
+
+ try {
+ log.info("Sleeping for [%s] before calling Overlord", delay);
+ Thread.sleep(delay.getMillis());
+ }
+ catch (InterruptedException e) {
+ log.info("Interrupted while sleeping before task action.");
+ }
}
}
diff --git
a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
index 569ad33e3ce..6cb9d77721c 100644
---
a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
+++
b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.commons.io.FileUtils;
+import org.apache.druid.cli.CliIndexer;
import org.apache.druid.cli.CliOverlord;
import org.apache.druid.cli.CliPeon;
import org.apache.druid.client.coordinator.CoordinatorClient;
@@ -128,9 +129,6 @@ public class ClusterTestingModuleTest
CoordinatorClient coordinatorClient =
peonInjector.getInstance(CoordinatorClient.class);
Assert.assertTrue(coordinatorClient instanceof FaultyCoordinatorClient);
- OverlordClient overlordClient =
peonInjector.getInstance(OverlordClient.class);
- Assert.assertTrue(overlordClient instanceof FaultyOverlordClient);
-
TaskActionClientFactory taskActionClientFactory =
peonInjector.getInstance(TaskActionClientFactory.class);
Assert.assertTrue(taskActionClientFactory instanceof
FaultyRemoteTaskActionClientFactory);
}
@@ -252,6 +250,26 @@ public class ClusterTestingModuleTest
}
}
+ @Test
+ public void test_indexerService_hasFaultyOverlordClient_ifTestingIsEnabled()
+ {
+ try {
+ final CliIndexer indexer = new CliIndexer();
+ System.setProperty("druid.unsafe.cluster.testing", "true");
+
+ final Injector baseInjector = new
StartupInjectorBuilder().forServer().build();
+ baseInjector.injectMembers(indexer);
+
+ final Injector indexerInjector =
indexer.makeInjector(Set.of(NodeRole.INDEXER));
+
+ OverlordClient overlordClient =
indexerInjector.getInstance(OverlordClient.class);
+ Assert.assertTrue(overlordClient instanceof FaultyOverlordClient);
+ }
+ finally {
+ System.clearProperty("druid.unsafe.cluster.testing");
+ }
+ }
+
private static void verifyTestingConfig(ClusterTestingTaskConfig taskConfig)
{
Assert.assertNotNull(taskConfig);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 72bd9cff174..7677b4a0371 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -129,7 +129,7 @@ import java.util.stream.Collectors;
* workers to support deprecated RemoteTaskRunner. So a method
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
* which should be removed in the release that removes RemoteTaskRunner legacy
ZK updation WorkerTaskMonitor class.
*/
-public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
+public class HttpRemoteTaskRunner implements WorkerTaskRunner,
TaskLogStreamer, WorkerHolder.Listener
{
private static final EmittingLogger log = new
EmittingLogger(HttpRemoteTaskRunner.class);
@@ -630,7 +630,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
httpClient,
config,
workersSyncExec,
- this::taskAddedOrUpdated,
+ this,
worker,
expectedAnnouncements
);
@@ -1507,7 +1507,7 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
return Optional.fromNullable(provisioningService.getStats());
}
- @VisibleForTesting
+ @Override
public void taskAddedOrUpdated(final TaskAnnouncement announcement, final
WorkerHolder workerHolder)
{
final String taskId = announcement.getTaskId();
@@ -1706,6 +1706,14 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
}
}
+ @Override
+ public void stateChanged(boolean enabled, WorkerHolder workerHolder)
+ {
+ synchronized (statusLock) {
+ statusLock.notifyAll();
+ }
+ }
+
@Override
public Map<String, Long> getTotalTaskSlotCount()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index 9d177c65410..4444916e69f 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -442,6 +442,7 @@ public class WorkerHolder
if (isWorkerDisabled != disabled.get()) {
disabled.set(isWorkerDisabled);
log.info("Worker[%s] disabled set to [%s].", worker.getHost(),
isWorkerDisabled);
+ listener.stateChanged(!isWorkerDisabled, WorkerHolder.this);
}
}
};
@@ -450,5 +451,7 @@ public class WorkerHolder
public interface Listener
{
void taskAddedOrUpdated(TaskAnnouncement announcement, WorkerHolder
workerHolder);
+
+ void stateChanged(boolean enabled, WorkerHolder workerHolder);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
index 3d0336029c1..d9ea67f0ce9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java
@@ -59,7 +59,20 @@ public class WorkerHolderTest
EasyMock.createNiceMock(HttpClient.class),
new HttpRemoteTaskRunnerConfig(),
EasyMock.createNiceMock(ScheduledExecutorService.class),
- (taskAnnouncement, holder) -> updates.add(taskAnnouncement),
+ new WorkerHolder.Listener()
+ {
+ @Override
+ public void taskAddedOrUpdated(TaskAnnouncement announcement,
WorkerHolder workerHolder)
+ {
+ updates.add(announcement);
+ }
+
+ @Override
+ public void stateChanged(boolean enabled, WorkerHolder workerHolder)
+ {
+ // Do nothing
+ }
+ },
new Worker("http", "localhost", "127.0.0.1", 5, "v0",
WorkerConfig.DEFAULT_CATEGORY),
ImmutableList.of(
TaskAnnouncement.create(
diff --git a/integration-tests-ex/cases/pom.xml
b/integration-tests-ex/cases/pom.xml
index 7ca37e758fc..703287d4857 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -247,11 +247,6 @@
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.druid</groupId>
- <artifactId>druid-sql</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-multi-stage-query</artifactId>
@@ -399,15 +394,6 @@
<it.category>AzureDeepStorage</it.category>
</properties>
</profile>
- <profile>
- <id>IT-Query</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <properties>
- <it.category>Query</it.category>
- </properties>
- </profile>
<profile>
<id>IT-MultiStageQuery</id>
<activation>
@@ -417,15 +403,6 @@
<it.category>MultiStageQuery</it.category>
</properties>
</profile>
- <profile>
- <id>IT-MultiStageQueryWithMM</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <properties>
- <it.category>MultiStageQueryWithMM</it.category>
- </properties>
- </profile>
<profile>
<id>IT-S3DeepStorage</id>
<activation>
@@ -444,15 +421,6 @@
<it.category>GcsDeepStorage</it.category>
</properties>
</profile>
- <profile>
- <id>IT-BackwardCompatibilityMain</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <properties>
- <it.category>BackwardCompatibilityMain</it.category>
- </properties>
- </profile>
<profile>
<id>docker-tests</id>
<activation>
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQueryWithMM.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQueryWithMM.java
deleted file mode 100644
index 97725c72255..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/MultiStageQueryWithMM.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.categories;
-
-public class MultiStageQueryWithMM
-{
-}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/Query.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/Query.java
deleted file mode 100644
index 252d1325f03..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/categories/Query.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.categories;
-
-public class Query
-{
-}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
deleted file mode 100644
index 8fb325b1c65..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
-import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.sql.http.SqlQuery;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.MsqTestQueryHelper;
-import org.apache.druid.testsEx.categories.MultiStageQuery;
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-@RunWith(DruidTestRunner.class)
-@Category(MultiStageQuery.class)
-public class ITKeyStatisticsSketchMergeMode
-{
- @Inject
- private MsqTestQueryHelper msqHelper;
-
- @Inject
- private DataLoaderHelper dataLoaderHelper;
-
- @Inject
- private CoordinatorResourceTestClient coordinatorClient;
-
- private static final String QUERY_FILE =
"/multi-stage-query/wikipedia_msq_select_query1.json";
-
- @Test
- public void testMsqIngestionParallelMerging() throws Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + "
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n"
- + "PARTITIONED BY DAY\n"
- + "CLUSTERED BY \"__time\"",
- datasource
- );
-
- ImmutableMap<String, Object> context = ImmutableMap.of(
- MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
- ClusterStatisticsMergeMode.PARALLEL
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false,
context, null);
- SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(sqlQuery);
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
- msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
- }
-
- @Test
- public void testMsqIngestionSequentialMerging() throws Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + "
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n"
- + "PARTITIONED BY DAY\n"
- + "CLUSTERED BY \"__time\"",
- datasource
- );
-
- ImmutableMap<String, Object> context = ImmutableMap.of(
- MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
- ClusterStatisticsMergeMode.SEQUENTIAL
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false,
context, null);
- SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(sqlQuery);
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
- msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
- }
-
-
- @Test
- public void testMsqIngestionSequentialMergingWithEmptyStatistics() throws
Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "Replace INTO %s overwrite ALL \n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + "
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n"
- + "where delta=111 "
- // we add this filter since delta=111 is only present in
wikipedia_index_data1.json. This means partitions from worker 2 will be empty.
- + "PARTITIONED BY DAY\n"
- + "CLUSTERED BY \"__time\"",
- datasource
- );
-
- ImmutableMap<String, Object> context = ImmutableMap.of(
- MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
- ClusterStatisticsMergeMode.SEQUENTIAL,
- MultiStageQueryContext.CTX_MAX_NUM_TASKS,
- 3
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false,
context, null);
- SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(sqlQuery);
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
-
msqHelper.testQueriesFromFile("/multi-stage-query/wikipedia_msq_select_query_sequential_test.json",
datasource);
- }
-}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
deleted file mode 100644
index ce6462e225a..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQuery.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-@RunWith(DruidTestRunner.class)
-@Category(MultiStageQuery.class)
-public class ITMultiStageQuery extends MultiStageQuery
-{
-}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryWorkerFaultTolerance.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryWorkerFaultTolerance.java
deleted file mode 100644
index 99282ecae4c..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMultiStageQueryWorkerFaultTolerance.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.MsqTestQueryHelper;
-import org.apache.druid.testsEx.categories.MultiStageQueryWithMM;
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.apache.druid.testsEx.utils.DruidClusterAdminClient;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-/**
- * As we need to kill the PID of the launched task, these tests should be run
with middle manager only.
- */
-@RunWith(DruidTestRunner.class)
-@Category(MultiStageQueryWithMM.class)
-public class ITMultiStageQueryWorkerFaultTolerance
-{
- private static final Logger LOG = new
Logger(ITMultiStageQueryWorkerFaultTolerance.class);
- @Inject
- private MsqTestQueryHelper msqHelper;
-
- @Inject
- private DataLoaderHelper dataLoaderHelper;
-
- @Inject
- private CoordinatorResourceTestClient coordinatorClient;
-
- @Inject
- private DruidClusterAdminClient druidClusterAdminClient;
-
- private static final String QUERY_FILE =
"/multi-stage-query/wikipedia_msq_select_query_ha.json";
-
- @Test
- public void testMsqIngestionAndQuerying() throws Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + "
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n"
- + "PARTITIONED BY DAY\n"
- + "CLUSTERED BY \"__time\"",
- datasource
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTaskSuccesfully(
- queryLocal,
- ImmutableMap.of(
- MultiStageQueryContext.CTX_FAULT_TOLERANCE,
- "true",
- MultiStageQueryContext.CTX_MAX_NUM_TASKS,
- 3
- )
- );
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
-
- String taskIdToKill = sqlTaskStatus.getTaskId() + "-worker1_0";
- killTaskAbruptly(taskIdToKill);
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
- msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
- }
-
- private void killTaskAbruptly(String taskIdToKill)
- {
- String command = "jps -mlv | grep -i peon | grep -i " + taskIdToKill + "
|awk '{print $1}'";
-
- ITRetryUtil.retryUntil(() -> {
-
- Pair<String, String> stdOut =
druidClusterAdminClient.runCommandInMiddleManagerContainer("/bin/bash", "-c",
-
command
- );
- LOG.info(StringUtils.format(
- "command %s \nstdout: %s\nstderr: %s",
- command,
- stdOut.lhs,
- stdOut.rhs
- ));
- if (stdOut.rhs != null && stdOut.rhs.length() != 0) {
- throw new ISE("Bad command");
- }
- String pidToKill = stdOut.lhs.trim();
- if (pidToKill.length() != 0) {
- LOG.info("Killing pid %s", pidToKill);
- final Pair<String, String> killResult =
druidClusterAdminClient.runCommandInMiddleManagerContainer(
- "/bin/bash",
- "-c",
- "kill -9 " + pidToKill
- );
- LOG.info(StringUtils.format("Kill command stdout: %s, stderr: %s",
killResult.lhs, killResult.rhs));
- return true;
- } else {
- return false;
- }
- }, true, 2000, 100, StringUtils.format("Figuring out PID for task[%s] to
kill abruptly", taskIdToKill));
- }
-}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
deleted file mode 100644
index d77d5dfe2c9..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/MultiStageQuery.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.msq;
-
-import com.google.api.client.util.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import org.apache.druid.indexer.report.TaskContextReport;
-import org.apache.druid.indexer.report.TaskReport;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.storage.local.LocalFileExportStorageProvider;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.utils.DataLoaderHelper;
-import org.apache.druid.testing.utils.MsqTestQueryHelper;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class MultiStageQuery
-{
- @Inject
- private MsqTestQueryHelper msqHelper;
-
- @Inject
- private DataLoaderHelper dataLoaderHelper;
-
- @Inject
- private CoordinatorResourceTestClient coordinatorClient;
-
- private static final String QUERY_FILE =
"/multi-stage-query/wikipedia_msq_select_query1.json";
-
- @Test
- public void testMsqIngestionAndQuerying() throws Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + "
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n"
- + "PARTITIONED BY DAY\n",
- datasource
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlTaskStatus sqlTaskStatus =
msqHelper.submitMsqTaskSuccesfully(queryLocal);
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
- msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
- }
-
- @Test
- @Ignore("localfiles() is disabled")
- public void testMsqIngestionAndQueryingWithLocalFn() throws Exception
- {
- String datasource = "dst";
-
- // Clear up the datasource from the previous runs
- coordinatorClient.unloadSegmentsForDataSource(datasource);
-
- String queryLocal =
- StringUtils.format(
- "INSERT INTO %s\n"
- + "SELECT\n"
- + " TIME_PARSE(\"timestamp\") AS __time,\n"
- + " isRobot,\n"
- + " diffUrl,\n"
- + " added,\n"
- + " countryIsoCode,\n"
- + " regionName,\n"
- + " channel,\n"
- + " flags,\n"
- + " delta,\n"
- + " isUnpatrolled,\n"
- + " isNew,\n"
- + " deltaBucket,\n"
- + " isMinor,\n"
- + " isAnonymous,\n"
- + " deleted,\n"
- + " cityName,\n"
- + " metroCode,\n"
- + " namespace,\n"
- + " comment,\n"
- + " page,\n"
- + " commentLength,\n"
- + " countryName,\n"
- + " user,\n"
- + " regionIsoCode\n"
- + "FROM TABLE(\n"
- + " LOCALFILES(\n"
- + " files =>
ARRAY['/resources/data/batch_index/json/wikipedia_index_data1.json'],\n"
- + " format => 'json'\n"
- + " ))\n"
- + " (\"timestamp\" VARCHAR, isRobot VARCHAR, diffUrl VARCHAR,
added BIGINT, countryIsoCode VARCHAR, regionName VARCHAR,\n"
- + " channel VARCHAR, flags VARCHAR, delta BIGINT, isUnpatrolled
VARCHAR, isNew VARCHAR, deltaBucket DOUBLE,\n"
- + " isMinor VARCHAR, isAnonymous VARCHAR, deleted BIGINT,
cityName VARCHAR, metroCode BIGINT, namespace VARCHAR,\n"
- + " comment VARCHAR, page VARCHAR, commentLength BIGINT,
countryName VARCHAR, \"user\" VARCHAR, regionIsoCode VARCHAR)\n"
- + "PARTITIONED BY DAY\n",
- datasource
- );
-
- // Submit the task and wait for the datasource to get loaded
- SqlTaskStatus sqlTaskStatus =
msqHelper.submitMsqTaskSuccesfully(queryLocal);
-
- if (sqlTaskStatus.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- sqlTaskStatus.getError()
- ));
- }
-
- msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId());
- dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
-
- msqHelper.testQueriesFromFile(QUERY_FILE, datasource);
- }
-
- @Test
- public void testExport() throws Exception
- {
- String exportQuery =
- StringUtils.format(
- "INSERT INTO extern(%s(exportPath => '%s'))\n"
- + "AS CSV\n"
- + "SELECT page, added, delta\n"
- + "FROM TABLE(\n"
- + " EXTERN(\n"
- + "
'{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\"]}',\n"
- + " '{\"type\":\"json\"}',\n"
- + "
'[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"i
[...]
- + " )\n"
- + ")\n",
- LocalFileExportStorageProvider.TYPE_NAME, "/shared/export/"
- );
-
- SqlTaskStatus exportTask = msqHelper.submitMsqTaskSuccesfully(exportQuery);
-
- msqHelper.pollTaskIdForSuccess(exportTask.getTaskId());
-
- if (exportTask.getState().isFailure()) {
- Assert.fail(StringUtils.format(
- "Unable to start the task successfully.\nPossible exception: %s",
- exportTask.getError()
- ));
- }
-
- String resultQuery = StringUtils.format(
- "SELECT page, delta, added\n"
- + " FROM TABLE(\n"
- + " EXTERN(\n"
- + "
'{\"type\":\"local\",\"baseDir\":\"/shared/export/\",\"filter\":\"*.csv\"}',\n"
- + " '{\"type\":\"csv\",\"findColumnsFromHeader\":true}'\n"
- + " )\n"
- + " ) EXTEND (\"added\" BIGINT, \"delta\" BIGINT, \"page\" VARCHAR)\n"
- + " WHERE delta != 0\n"
- + " ORDER BY page");
-
- SqlTaskStatus resultTaskStatus =
msqHelper.submitMsqTaskSuccesfully(resultQuery);
-
- msqHelper.pollTaskIdForSuccess(resultTaskStatus.getTaskId());
-
- TaskReport.ReportMap statusReport =
msqHelper.fetchStatusReports(resultTaskStatus.getTaskId());
-
- MSQTaskReport taskReport = (MSQTaskReport)
statusReport.get(MSQTaskReport.REPORT_KEY);
- if (taskReport == null) {
- throw new ISE("Unable to fetch the status report for the task [%]",
resultTaskStatus.getTaskId());
- }
- TaskContextReport taskContextReport = (TaskContextReport)
statusReport.get(TaskContextReport.REPORT_KEY);
- Assert.assertFalse(taskContextReport.getPayload().isEmpty());
-
- MSQTaskReportPayload taskReportPayload = Preconditions.checkNotNull(
- taskReport.getPayload(),
- "payload"
- );
- MSQResultsReport resultsReport = Preconditions.checkNotNull(
- taskReportPayload.getResults(),
- "Results report for the task id is empty"
- );
-
- List<List<Object>> actualResults = new ArrayList<>();
-
- for (final Object[] row : resultsReport.getResults()) {
- actualResults.add(Arrays.asList(row));
- }
-
- ImmutableList<ImmutableList<Object>> expectedResults = ImmutableList.of(
- ImmutableList.of("Cherno Alpha", 111, 123),
- ImmutableList.of("Gypsy Danger", -143, 57),
- ImmutableList.of("Striker Eureka", 330, 459)
- );
-
- Assert.assertEquals(
- expectedResults,
- actualResults
- );
- }
-}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
deleted file mode 100644
index 03022da32fa..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/ITUnionQueryTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.query;
-
-import org.apache.druid.testsEx.categories.Query;
-import org.apache.druid.testsEx.config.DruidTestRunner;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-
-@RunWith(DruidTestRunner.class)
-@Category(Query.class)
-public class ITUnionQueryTest extends UnionQueryTest
-{
-}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
deleted file mode 100644
index 34cd2c30eae..00000000000
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/query/UnionQueryTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testsEx.query;
-
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.KafkaEventWriter;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.utils.KafkaAdminClient;
-import org.apache.druid.testsEx.indexer.AbstractIndexerTest;
-import org.joda.time.Interval;
-import org.junit.Test;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.function.Function;
-
-public class UnionQueryTest extends AbstractIndexerTest
-{
- private static final Logger LOG = new Logger(UnionQueryTest.class);
- private static final String UNION_SUPERVISOR_TEMPLATE =
"/query/union_kafka_supervisor_template.json";
- private static final String UNION_DATA_FILE = "/query/union_data.json";
- private static final String UNION_QUERIES_RESOURCE =
"/query/union_queries.json";
- private static final String UNION_DATASOURCE = "wikipedia_index_test";
- private String fullDatasourceName;
-
- @Test
- public void testUnionQuery() throws Exception
- {
- fullDatasourceName = UNION_DATASOURCE +
config.getExtraDatasourceNameSuffix();
- final String baseName = fullDatasourceName + UUID.randomUUID();
- KafkaAdminClient streamAdminClient = new KafkaAdminClient(config);
- List<String> supervisors = new ArrayList<>();
-
- final int numDatasources = 3;
- for (int i = 0; i < numDatasources; i++) {
- String datasource = baseName + "-" + i;
- streamAdminClient.createStream(datasource, 1, Collections.emptyMap());
- ITRetryUtil.retryUntil(
- () -> streamAdminClient.isStreamActive(datasource),
- true,
- 10000,
- 30,
- "Wait for stream active"
- );
- String supervisorSpec = generateStreamIngestionPropsTransform(
- datasource,
- datasource,
- config
- ).apply(getResourceAsString(UNION_SUPERVISOR_TEMPLATE));
- LOG.info("supervisorSpec: [%s]\n", supervisorSpec);
- // Start supervisor
- String specResponse = indexer.submitSupervisor(supervisorSpec);
- LOG.info("Submitted supervisor [%s]", specResponse);
- supervisors.add(specResponse);
-
- int ctr = 0;
- try (
- StreamEventWriter streamEventWriter = new KafkaEventWriter(config,
false);
- BufferedReader reader = new BufferedReader(
- new InputStreamReader(getResourceAsStream(UNION_DATA_FILE),
StandardCharsets.UTF_8)
- )
- ) {
- String line;
- while ((line = reader.readLine()) != null) {
- streamEventWriter.write(datasource, StringUtils.toUtf8(line));
- ctr++;
- }
- }
- final int numWritten = ctr;
-
- LOG.info("Waiting for stream indexing tasks to consume events");
-
- ITRetryUtil.retryUntilTrue(
- () ->
- numWritten == this.queryHelper.countRows(
- datasource,
- Intervals.ETERNITY,
- name -> new LongSumAggregatorFactory(name, "count")
- ),
- StringUtils.format(
- "dataSource[%s] consumed [%,d] events, expected [%,d]",
- datasource,
- this.queryHelper.countRows(
- datasource,
- Intervals.ETERNITY,
- name -> new LongSumAggregatorFactory(name, "count")
- ),
- numWritten
- )
- );
- }
-
- String queryResponseTemplate = StringUtils.replace(
- getResourceAsString(UNION_QUERIES_RESOURCE),
- "%%DATASOURCE%%",
- baseName
- );
-
- queryHelper.testQueriesFromString(queryResponseTemplate);
-
-
- for (int i = 0; i < numDatasources; i++) {
- indexer.terminateSupervisor(supervisors.get(i));
- streamAdminClient.deleteStream(baseName + "-" + i);
- }
-
- for (int i = 0; i < numDatasources; i++) {
- final int datasourceNumber = i;
- ITRetryUtil.retryUntil(
- () -> coordinator.areSegmentsLoaded(baseName + "-" +
datasourceNumber),
- true,
- 10000,
- 10,
- "Kafka segments loaded"
- );
- }
-
- queryHelper.testQueriesFromString(queryResponseTemplate);
-
- for (int i = 0; i < numDatasources; i++) {
- final String datasource = baseName + "-" + i;
- List<String> intervals = coordinator.getSegmentIntervals(datasource);
-
- Collections.sort(intervals);
- String first = intervals.get(0).split("/")[0];
- String last = intervals.get(intervals.size() - 1).split("/")[1];
- Interval interval = Intervals.of(first + "/" + last);
- coordinator.unloadSegmentsForDataSource(baseName + "-" + i);
- ITRetryUtil.retryUntilFalse(
- () -> coordinator.areSegmentsLoaded(datasource),
- "Segment Unloading"
- );
- coordinator.deleteSegmentsDataSource(baseName + "-" + i, interval);
- }
- }
-
-
- /**
- * sad version of
- * {@link
org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest#generateStreamIngestionPropsTransform}
- */
- private Function<String, String> generateStreamIngestionPropsTransform(
- String streamName,
- String fullDatasourceName,
- IntegrationTestingConfig config
- )
- {
- final Map<String, Object> consumerConfigs =
KafkaConsumerConfigs.getConsumerProperties();
- final Properties consumerProperties = new Properties();
- consumerProperties.putAll(consumerConfigs);
- consumerProperties.setProperty("bootstrap.servers",
config.getKafkaInternalHost());
- KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
- return spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%DATASOURCE%%",
- fullDatasourceName
- );
- spec = StringUtils.replace(
- spec,
- "%%TOPIC_VALUE%%",
- streamName
- );
- return StringUtils.replace(
- spec,
- "%%STREAM_PROPERTIES_VALUE%%",
- jsonMapper.writeValueAsString(consumerProperties)
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
- }
-}
diff --git
a/integration-tests-ex/cases/src/test/resources/query/union_data.json
b/integration-tests-ex/cases/src/test/resources/query/union_data.json
deleted file mode 100644
index b186657dbcf..00000000000
--- a/integration-tests-ex/cases/src/test/resources/query/union_data.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" :
"en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot":
"false", "anonymous": "false", "namespace":"article", "continent":"North
America", "country":"United States", "region":"Bay Area", "city":"San
Francisco", "added": 57, "deleted": 200, "delta": -143}
-{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" :
"en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot":
"true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia",
"country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459,
"deleted": 129, "delta": 330}
-{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" :
"ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true",
"robot": "true", "anonymous": "false", "namespace":"article",
"continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow",
"added": 123, "deleted": 12, "delta": 111}
-{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" :
"zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false",
"robot": "true", "anonymous": "false", "namespace":"wikipedia",
"continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan",
"added": 905, "deleted": 5, "delta": 900}
-{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" :
"ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false",
"robot": "true", "anonymous": "false", "namespace":"wikipedia",
"continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo",
"added": 1, "deleted": 10, "delta": -9}
-{"timestamp": "2013-09-01T01:02:33Z", "page": "Gypsy Danger", "language" :
"en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot":
"false", "anonymous": "false", "namespace":"article", "continent":"North
America", "country":"United States", "region":"Bay Area", "city":"San
Francisco", "added": 57, "deleted": 200, "delta": -143}
-{"timestamp": "2013-09-01T03:32:45Z", "page": "Striker Eureka", "language" :
"en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot":
"true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia",
"country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459,
"deleted": 129, "delta": 330}
-{"timestamp": "2013-09-01T07:11:21Z", "page": "Cherno Alpha", "language" :
"ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true",
"robot": "true", "anonymous": "false", "namespace":"article",
"continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow",
"added": 123, "deleted": 12, "delta": 111}
-{"timestamp": "2013-09-01T11:58:39Z", "page": "Crimson Typhoon", "language" :
"zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false",
"robot": "true", "anonymous": "false", "namespace":"wikipedia",
"continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan",
"added": 905, "deleted": 5, "delta": 900}
-{"timestamp": "2013-09-01T12:41:27Z", "page": "Coyote Tango", "language" :
"ja", "user" : "stringer", "unpatrolled" : "true", "newPage" : "false",
"robot": "true", "anonymous": "false", "namespace":"wikipedia",
"continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo",
"added": 1, "deleted": 10, "delta": -9}
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/query/union_kafka_supervisor_template.json
b/integration-tests-ex/cases/src/test/resources/query/union_kafka_supervisor_template.json
deleted file mode 100644
index a37340fe858..00000000000
---
a/integration-tests-ex/cases/src/test/resources/query/union_kafka_supervisor_template.json
+++ /dev/null
@@ -1,69 +0,0 @@
-{
- "type": "kafka",
- "dataSchema": {
- "dataSource": "%%DATASOURCE%%",
- "timestampSpec": {
- "column": "timestamp",
- "format": "auto"
- },
- "dimensionsSpec": {
- "dimensions": [
- "page",
- "language",
- "user",
- "unpatrolled",
- "newPage",
- "robot",
- "anonymous",
- "namespace",
- "continent",
- "country",
- "region",
- "city"
- ],
- "dimensionExclusions": [],
- "spatialDimensions": []
- },
- "metricsSpec": [
- {
- "type": "count",
- "name": "count"
- },
- {
- "type": "doubleSum",
- "name": "added",
- "fieldName": "added"
- },
- {
- "type": "doubleSum",
- "name": "deleted",
- "fieldName": "deleted"
- },
- {
- "type": "doubleSum",
- "name": "delta",
- "fieldName": "delta"
- }
- ],
- "granularitySpec": {
- "type": "uniform",
- "segmentGranularity": "DAY",
- "queryGranularity": "second"
- }
- },
- "tuningConfig": {
- "type": "kafka",
- "intermediatePersistPeriod": "PT1H",
- "maxRowsPerSegment": 5000000,
- "maxRowsInMemory": 500000
- },
- "ioConfig": {
- "topic": "%%TOPIC_VALUE%%",
- "consumerProperties": %%STREAM_PROPERTIES_VALUE%%,
- "taskCount": 2,
- "replicas": 1,
- "taskDuration": "PT120S",
- "useEarliestOffset": true,
- "inputFormat" : {"type": "json"}
- }
-}
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/query/union_queries.json
b/integration-tests-ex/cases/src/test/resources/query/union_queries.json
deleted file mode 100644
index 74dd6db299d..00000000000
--- a/integration-tests-ex/cases/src/test/resources/query/union_queries.json
+++ /dev/null
@@ -1,566 +0,0 @@
-[
- {
- "description": "timeseries, filtered, all aggs, all",
- "query": {
- "queryType": "timeseries",
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- },
- "intervals": ["2013-08-31/2013-09-01"],
- "granularity": "all",
- "filter": {
- "type": "selector",
- "dimension": "language",
- "value": "en"
- },
- "aggregations": [
- {
- "type": "count",
- "name": "rows"
- },
- {
- "type": "longSum",
- "fieldName": "count",
- "name": "count"
- },
- {
- "type": "doubleSum",
- "fieldName": "added",
- "name": "added"
- },
- {
- "type": "doubleSum",
- "fieldName": "deleted",
- "name": "deleted"
- },
- {
- "type": "doubleSum",
- "fieldName": "delta",
- "name": "delta"
- }
- ],
- "context": {
- "useCache": "true",
- "populateCache": "true",
- "timeout": 60000
- }
- },
- "expectedResults": [
- {
- "timestamp": "2013-08-31T01:02:33.000Z",
- "result": {
- "added": 1548.0,
- "count": 6,
- "delta": 561.0,
- "deleted": 987.0,
- "rows": 6
- }
- }
- ]
- },
- {
- "description": "topN, all aggs, page dim, uniques metric",
- "query": {
- "queryType": "topN",
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- },
- "intervals": ["2013-08-31/2013-09-01"],
- "granularity": "all",
- "aggregations": [
- {
- "type": "count",
- "name": "rows"
- },
- {
- "type": "longSum",
- "fieldName": "count",
- "name": "count"
- },
- {
- "type": "doubleSum",
- "fieldName": "added",
- "name": "added"
- },
- {
- "type": "doubleSum",
- "fieldName": "deleted",
- "name": "deleted"
- },
- {
- "type": "doubleSum",
- "fieldName": "delta",
- "name": "delta"
- }
- ],
- "dimension": "page",
- "metric": "added",
- "threshold": 3,
- "context": {
- "useCache": "true",
- "populateCache": "true",
- "timeout": 60000
- }
- },
- "expectedResults": [
- {
- "timestamp": "2013-08-31T01:02:33.000Z",
- "result": [
- {
- "added": 2715.0,
- "count": 3,
- "page": "Crimson Typhoon",
- "delta": 2700.0,
- "deleted": 15.0,
- "rows": 3
- },
- {
- "added": 1377.0,
- "count": 3,
- "page": "Striker Eureka",
- "delta": 990.0,
- "deleted": 387.0,
- "rows": 3
- },
- {
- "added": 369.0,
- "count": 3,
- "page": "Cherno Alpha",
- "delta": 333.0,
- "deleted": 36.0,
- "rows": 3
- }
- ]
- }
- ]
- },
- {
- "description": "topN, all aggs, page dim, count metric, postAggs",
- "query": {
- "queryType": "topN",
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- },
- "intervals": ["2013-08-31/2013-09-01"],
- "granularity": "all",
- "aggregations": [
- {
- "type": "count",
- "name": "rows"
- },
- {
- "type": "longSum",
- "fieldName": "count",
- "name": "count"
- },
- {
- "type": "doubleSum",
- "fieldName": "added",
- "name": "added"
- },
- {
- "type": "doubleSum",
- "fieldName": "deleted",
- "name": "deleted"
- },
- {
- "type": "doubleSum",
- "fieldName": "delta",
- "name": "delta"
- }
- ],
- "postAggregations": [
- {
- "type": "arithmetic",
- "name": "sumOfAddedDeletedConst",
- "fn": "+",
- "fields": [
- {
- "type": "fieldAccess",
- "name": "added",
- "fieldName": "added"
- },
- {
- "type": "arithmetic",
- "name": "",
- "fn": "+",
- "fields": [
- {
- "type": "fieldAccess",
- "name": "deleted",
- "fieldName": "deleted"
- },
- {
- "type": "constant",
- "name": "constant",
- "value": 1000
- }
- ]
- }
- ]
- }
- ],
- "dimension": "page",
- "metric": "added",
- "threshold": 3,
- "context": {
- "useCache": "true",
- "populateCache": "true",
- "timeout": 60000
- }
- },
- "expectedResults": [
- {
- "timestamp": "2013-08-31T01:02:33.000Z",
- "result": [
- {
- "added": 2715.0,
- "count": 3,
- "page": "Crimson Typhoon",
- "delta": 2700.0,
- "deleted": 15.0,
- "sumOfAddedDeletedConst": 3730.0,
- "rows": 3
- },
- {
- "added": 1377.0,
- "count": 3,
- "page": "Striker Eureka",
- "delta": 990.0,
- "deleted": 387.0,
- "sumOfAddedDeletedConst": 2764.0,
- "rows": 3
- },
- {
- "added": 369.0,
- "count": 3,
- "page": "Cherno Alpha",
- "delta": 333.0,
- "deleted": 36.0,
- "sumOfAddedDeletedConst": 1405.0,
- "rows": 3
- }
- ]
- }
- ]
- },
- {
- "description": "topN, lexicographic, two aggs, language dim, postAggs",
- "query": {
- "queryType": "topN",
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- },
- "intervals": ["2013-08-31/2013-09-01"],
- "granularity": "all",
- "aggregations": [
- {
- "type": "count",
- "name": "rows"
- },
- {
- "type": "longSum",
- "fieldName": "count",
- "name": "count"
- }
- ],
- "postAggregations": [
- {
- "type": "arithmetic",
- "name": "sumOfRowsAndCount",
- "fn": "+",
- "fields": [
- {
- "type": "fieldAccess",
- "name": "rows",
- "fieldName": "rows"
- },
- {
- "type": "fieldAccess",
- "name": "count",
- "fieldName": "count"
- }
- ]
- }
- ],
- "dimension": "language",
- "metric": {
- "type": "lexicographic",
- "previousStop": "a"
- },
- "threshold": 3,
- "context": {
- "useCache": "true",
- "populateCache": "true",
- "timeout": 60000
- }
- },
- "expectedResults": [
- {
- "timestamp": "2013-08-31T01:02:33.000Z",
- "result": [
- {
- "sumOfRowsAndCount": 12.0,
- "count": 6,
- "language": "en",
- "rows": 6
- },
- {
- "sumOfRowsAndCount": 6.0,
- "count": 3,
- "language": "ja",
- "rows": 3
- },
- {
- "sumOfRowsAndCount": 6.0,
- "count": 3,
- "language": "ru",
- "rows": 3
- }
- ]
- }
- ]
- },
- {
- "description": "groupBy, two aggs, namespace dim, postAggs",
- "query": {
- "queryType": "groupBy",
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- },
- "intervals": ["2013-08-31/2013-09-01"],
- "granularity": "all",
- "aggregations": [
- {
- "type": "count",
- "name": "rows"
- },
- {
- "type": "longSum",
- "fieldName": "count",
- "name": "count"
- }
- ],
- "postAggregations": [
- {
- "type": "arithmetic",
- "name": "sumOfRowsAndCount",
- "fn": "+",
- "fields": [
- {
- "type": "fieldAccess",
- "name": "rows",
- "fieldName": "rows"
- },
- {
- "type": "fieldAccess",
- "name": "count",
- "fieldName": "count"
- }
- ]
- }
- ],
- "dimensions": ["namespace"],
- "context": {
- "useCache": "true",
- "populateCache": "true",
- "timeout": 60000
- }
- },
- "expectedResults": [
- {
- "version": "v1",
- "timestamp": "2013-08-31T00:00:00.000Z",
- "event": {
- "sumOfRowsAndCount": 12.0,
- "count": 6,
- "rows": 6,
- "namespace": "article"
- }
- },
- {
- "version": "v1",
- "timestamp": "2013-08-31T00:00:00.000Z",
- "event": {
- "sumOfRowsAndCount": 18.0,
- "count": 9,
- "rows": 9,
- "namespace": "wikipedia"
- }
- }
- ]
- },
- {
- "description": "groupBy, two aggs, namespace + robot dim, postAggs",
- "query": {
- "queryType": "groupBy",
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- },
- "intervals": ["2013-08-31/2013-09-01"],
- "granularity": "all",
- "aggregations": [
- {
- "type": "count",
- "name": "rows"
- },
- {
- "type": "longSum",
- "fieldName": "count",
- "name": "count"
- }
- ],
- "postAggregations": [
- {
- "type": "arithmetic",
- "name": "sumOfRowsAndCount",
- "fn": "+",
- "fields": [
- {
- "type": "fieldAccess",
- "name": "rows",
- "fieldName": "rows"
- },
- {
- "type": "fieldAccess",
- "name": "count",
- "fieldName": "count"
- }
- ]
- }
- ],
- "dimensions": ["namespace", "robot"],
- "limitSpec": {
- "type": "default",
- "limit": 3,
- "orderBy": ["robot", "namespace"]
- },
- "context": {
- "useCache": "true",
- "populateCache": "true",
- "timeout": 60000
- }
- },
- "expectedResults": [
- {
- "version": "v1",
- "timestamp": "2013-08-31T00:00:00.000Z",
- "event": {
- "sumOfRowsAndCount": 6.0,
- "count": 3,
- "robot": "false",
- "rows": 3,
- "namespace": "article"
- }
- },
- {
- "version": "v1",
- "timestamp": "2013-08-31T00:00:00.000Z",
- "event": {
- "sumOfRowsAndCount": 6.0,
- "count": 3,
- "robot": "true",
- "rows": 3,
- "namespace": "article"
- }
- },
- {
- "version": "v1",
- "timestamp": "2013-08-31T00:00:00.000Z",
- "event": {
- "sumOfRowsAndCount": 18.0,
- "count": 9,
- "robot": "true",
- "rows": 9,
- "namespace": "wikipedia"
- }
- }
- ]
- },
- {
- "query": {
- "queryType": "search",
- "intervals": ["2013-08-31/2013-09-01"],
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- },
- "granularity": "all",
- "query": {
- "type": "insensitive_contains",
- "value": "ip"
- },
- "context": {
- "useCache": "true",
- "populateCache": "true",
- "timeout": 60000
- }
- },
- "expectedResults": [
- {
- "timestamp": "2013-08-31T00:00:00.000Z",
- "result": [
- {
- "dimension": "user",
- "value": "triplets",
- "count":3
- },
- {
- "dimension": "namespace",
- "value": "wikipedia",
- "count":9
- }
- ]
- }
- ]
- },
- {
- "description": "timeboundary, 1 agg, union",
- "query": {
- "queryType": "timeBoundary",
- "dataSource": {
- "type": "union",
- "dataSources": [
- "%%DATASOURCE%%-1", "%%DATASOURCE%%-2", "%%DATASOURCE%%-3",
- "%%DATASOURCE%%-0"
- ]
- }
- },
- "expectedResults": [
- {
- "timestamp": "2013-08-31T01:02:33.000Z",
- "result": {
- "minTime": "2013-08-31T01:02:33.000Z",
- "maxTime": "2013-09-01T12:41:27.000Z"
- }
- }
- ]
- }
-]
diff --git a/integration-tests-ex/image/docker-build.sh
b/integration-tests-ex/image/docker-build.sh
index 4ff5d7a7438..006ffaf092d 100755
--- a/integration-tests-ex/image/docker-build.sh
+++ b/integration-tests-ex/image/docker-build.sh
@@ -55,21 +55,3 @@ docker build -t $DRUID_IT_IMAGE_NAME \
--build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
--build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
.
-
-if [[ -z "${BACKWARD_COMPATIBILITY_IT_ENABLED:-""}" ||
$BACKWARD_COMPATIBILITY_IT_ENABLED != "true" ]]; then
- echo "Not building previous version image."
- exit 0
-fi
-
-# Download the previous druid tar
-curl -L $DRUID_PREVIOUS_VERSION_DOWNLOAD_URL --output
apache-druid-$DRUID_PREVIOUS_VERSION-bin.tar.gz
-
-docker build -t $DRUID_PREVIOUS_IT_IMAGE_NAME \
- --build-arg DRUID_VERSION=$DRUID_PREVIOUS_VERSION \
- --build-arg MYSQL_VERSION=$MYSQL_VERSION \
- --build-arg MARIADB_VERSION=$MARIADB_VERSION \
- --build-arg CONFLUENT_VERSION=$CONFLUENT_VERSION \
- --build-arg HADOOP_VERSION=$HADOOP_VERSION \
- --build-arg MYSQL_DRIVER_CLASSNAME=$MYSQL_DRIVER_CLASSNAME \
- --build-arg DRUID_TESTING_TOOLS_VERSION=$DRUID_VERSION \
- .
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index e04fe7b7c67..d7279e60347 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -141,6 +141,7 @@ public class MSQWorkerTask extends AbstractTask
@Override
public TaskStatus runTask(final TaskToolbox toolbox)
{
+ emitMetric(toolbox.getEmitter(), "ingest/count", 1);
try (final IndexerWorkerContext context =
IndexerWorkerContext.createProductionInstance(this, toolbox, injector)) {
// We'll run the worker in a separate thread, so we can interrupt the
thread on shutdown or controller failure.
final ListeningExecutorService workerExec =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]