This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6fdce36 Add integration tests for query retry on missing segments
(#10171)
6fdce36 is described below
commit 6fdce36e41b109f74077c0a68fc2d2ec5d771e29
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Jul 22 22:30:35 2020 -0700
Add integration tests for query retry on missing segments (#10171)
* Add integration tests for query retry on missing segments
* add missing dependencies; fix travis conf
* address comments
* Integration tests extension
* remove unused dependency
* remove druid_main
* fix java agent port
---
.travis.yml | 19 +-
.../org/apache/druid/cli/CliCommandCreator.java | 5 +
.../apache/druid/indexing/common/task/Tasks.java | 2 +-
.../docker/docker-compose.query-retry-test.yml | 128 +++++++++++
integration-tests/docker/druid.sh | 9 +-
.../historical-for-query-retry-test | 33 +++
.../docker/test-data/query-retry-sample-data.sql | 20 ++
integration-tests/pom.xml | 9 +-
integration-tests/script/copy_resources.sh | 16 +-
integration-tests/script/docker_run_cluster.sh | 13 +-
.../druid/cli/CliHistoricalForQueryRetryTest.java | 56 +++++
.../druid/cli/QueryRetryTestCommandCreator.java | 14 +-
.../ServerManagerForQueryRetryTest.java | 141 +++++++++++++
.../druid/testing/utils/QueryWithResults.java | 7 +-
.../org.apache.druid.cli.CliCommandCreator} | 15 +-
.../java/org/apache/druid/tests/TestNGGroup.java | 2 +
.../query/ITQueryRetryTestOnMissingSegments.java | 235 +++++++++++++++++++++
.../druid/tests/query/ITWikipediaQueryTest.java | 1 -
...ipedia_editstream_queries_query_retry_test.json | 26 +++
integration-tests/stop_cluster.sh | 2 +-
.../java/org/apache/druid/query/QueryContexts.java | 15 +-
.../timeseries/TimeseriesQueryQueryToolChest.java | 14 +-
.../org/apache/druid/query/RetryQueryRunner.java | 8 +-
.../druid/server/coordination/ServerManager.java | 73 ++++---
.../java/org/apache/druid/cli/CliHistorical.java | 13 +-
25 files changed, 799 insertions(+), 77 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 6378c72..4bb73bc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -246,7 +246,7 @@ jobs:
<<: *test_processing_module
name: "(openjdk8) other modules test"
env:
- -
MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console'
+ -
MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console,!integration-tests'
- <<: *test_other_modules
name: "(openjdk11) other modules test"
@@ -385,6 +385,14 @@ jobs:
script: *run_integration_test
after_failure: *integration_test_diags
+ - &integration_query_retry
+ name: "(Compile=openjdk8, Run=openjdk8) query retry integration test for
missing segments"
+ jdk: openjdk8
+ services: *integration_test_services
+ env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=8'
+ script: *run_integration_test
+ after_failure: *integration_test_diags
+
- &integration_security
name: "(Compile=openjdk8, Run=openjdk8) security integration test"
jdk: openjdk8
@@ -413,7 +421,7 @@ jobs:
name: "(Compile=openjdk8, Run=openjdk8) other integration test"
jdk: openjdk8
services: *integration_test_services
- env:
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-sto
[...]
+ env:
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-h
[...]
script: *run_integration_test
after_failure: *integration_test_diags
# END - Integration tests for Compile with Java 8 and Run with Java 8
@@ -444,6 +452,11 @@ jobs:
jdk: openjdk8
env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11'
+ - <<: *integration_query_retry
+ name: "(Compile=openjdk8, Run=openjdk11) query retry integration test
for missing segments"
+ jdk: openjdk8
+ env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11'
+
- <<: *integration_security
name: "(Compile=openjdk8, Run=openjdk11) security integration test"
jdk: openjdk8
@@ -462,7 +475,7 @@ jobs:
- <<: *integration_tests
name: "(Compile=openjdk8, Run=openjdk11) other integration test"
jdk: openjdk8
- env:
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-sto
[...]
+ env:
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-h
[...]
# END - Integration tests for Compile with Java 8 and Run with Java 11
- name: "security vulnerabilities"
diff --git a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
b/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
index dc314a7..f554d16 100644
--- a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
+++ b/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
@@ -23,6 +23,11 @@ import io.airlift.airline.Cli;
import org.apache.druid.guice.annotations.ExtensionPoint;
/**
+ * An extension point to create a custom Druid service. Druid can understand
and execute custom commands
+ * to run services loaded via Druid's extension system (see {@code
Initialization#getFromExtensions}). See
+ * the {@code Main} class for details of groups and commands.
+ *
+ * Implementations should be registered in the {@code
META-INF/services/org.apache.druid.cli.CliCommandCreator} file.
*/
@ExtensionPoint
public interface CliCommandCreator
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index e16e52e..502eb85 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -51,7 +51,7 @@ public class Tasks
public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
/**
- *This context is used in auto compaction. When it is set in the context,
the segments created by the task
+ * This context is used in auto compaction. When it is set in the context,
the segments created by the task
* will fill 'lastCompactionState' in its metadata. This will be used to
track what segments are compacted or not.
* See {@link org.apache.druid.timeline.DataSegment} and {@link
* org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for
more details.
diff --git a/integration-tests/docker/docker-compose.query-retry-test.yml
b/integration-tests/docker/docker-compose.query-retry-test.yml
new file mode 100644
index 0000000..9b5a5a6
--- /dev/null
+++ b/integration-tests/docker/docker-compose.query-retry-test.yml
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "2.2"
+services:
+ druid-zookeeper-kafka:
+ extends:
+ file: docker-compose.base.yml
+ service: druid-zookeeper-kafka
+
+ druid-metadata-storage:
+ extends:
+ file: docker-compose.base.yml
+ service: druid-metadata-storage
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ depends_on:
+ - druid-zookeeper-kafka
+
+ druid-overlord:
+ extends:
+ file: docker-compose.base.yml
+ service: druid-overlord
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ links:
+ - druid-metadata-storage:druid-metadata-storage
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ depends_on:
+ - druid-metadata-storage
+ - druid-zookeeper-kafka
+
+ druid-coordinator:
+ extends:
+ file: docker-compose.base.yml
+ service: druid-coordinator
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ links:
+ - druid-overlord:druid-overlord
+ - druid-metadata-storage:druid-metadata-storage
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ depends_on:
+ - druid-overlord
+ - druid-metadata-storage
+ - druid-zookeeper-kafka
+
+ druid-historical:
+ extends:
+ file: docker-compose.base.yml
+ service: druid-historical
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ links:
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ depends_on:
+ - druid-zookeeper-kafka
+
+ druid-broker:
+ extends:
+ file: docker-compose.base.yml
+ service: druid-broker
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ links:
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ - druid-historical:druid-historical
+ depends_on:
+ - druid-zookeeper-kafka
+ - druid-historical
+
+ druid-router:
+ extends:
+ file: docker-compose.base.yml
+ service: druid-router
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ links:
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ - druid-coordinator:druid-coordinator
+ - druid-broker:druid-broker
+ depends_on:
+ - druid-zookeeper-kafka
+ - druid-coordinator
+ - druid-broker
+
+ druid-historical-for-query-retry-test:
+ image: druid/cluster
+ container_name: druid-historical-for-query-retry-test
+ networks:
+ druid-it-net:
+ ipv4_address: 172.172.172.13
+ ports:
+ - 8084:8083
+ - 8284:8283
+ - 5010:5007
+ privileged: true
+ volumes:
+ - ${HOME}/shared:/shared
+ - ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
+ env_file:
+ - ./environment-configs/common
+ - ./environment-configs/historical-for-query-retry-test
+ environment:
+ - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+ links:
+ - druid-zookeeper-kafka:druid-zookeeper-kafka
+ depends_on:
+ - druid-zookeeper-kafka
+
+networks:
+ druid-it-net:
+ name: druid-it-net
+ ipam:
+ config:
+ - subnet: 172.172.172.0/24
\ No newline at end of file
diff --git a/integration-tests/docker/druid.sh
b/integration-tests/docker/druid.sh
index a17df11..4896102 100755
--- a/integration-tests/docker/druid.sh
+++ b/integration-tests/docker/druid.sh
@@ -23,6 +23,7 @@ getConfPath()
case "$1" in
_common) echo $cluster_conf_base/_common ;;
historical) echo $cluster_conf_base/data/historical ;;
+ historical-for-query-retry-test) echo $cluster_conf_base/data/historical ;;
middleManager) echo $cluster_conf_base/data/middleManager ;;
coordinator) echo $cluster_conf_base/master/coordinator ;;
broker) echo $cluster_conf_base/query/broker ;;
@@ -82,14 +83,18 @@ setupData()
# The "query" and "security" test groups require data to be setup before
running the tests.
# In particular, they requires segments to be download from a pre-existing
s3 bucket.
# This is done by using the loadSpec put into metadatastore and s3
credientials set below.
- if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
+ if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [
"$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
# touch is needed because OverlayFS's copy-up operation breaks POSIX
standards. See https://github.com/docker/for-linux/issues/72.
find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
&& cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql |
mysql -u root druid && /etc/init.d/mysql stop
# below s3 credentials needed to access the pre-existing s3 bucket
setKey $DRUID_SERVICE druid.s3.accessKey AKIAJI7DG7CDECGBQ6NA
setKey $DRUID_SERVICE druid.s3.secretKey
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
- setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
+ if [[ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]]; then
+ setKey $DRUID_SERVICE druid.extensions.loadList
[\"druid-s3-extensions\",\"druid-integration-tests\"]
+ else
+ setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
+ fi
# The region of the sample data s3 blobs needed for these test groups
export AWS_REGION=us-east-1
fi
diff --git
a/integration-tests/docker/environment-configs/historical-for-query-retry-test
b/integration-tests/docker/environment-configs/historical-for-query-retry-test
new file mode 100644
index 0000000..7cbe6d9
--- /dev/null
+++
b/integration-tests/docker/environment-configs/historical-for-query-retry-test
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+DRUID_SERVICE=historical-for-query-retry-test
+DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log
+
+# JAVA OPTS
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m
-XX:MaxNewSize=256m -XX:+UseG1GC
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
+
+# Druid configs
+druid_processing_buffer_sizeBytes=25000000
+druid_processing_numThreads=2
+druid_query_groupBy_maxOnDiskStorage=300000000
+druid_segmentCache_locations=[{"path":"/shared/druid/indexCache-query-retry-test","maxSize":5000000000}]
+druid_server_maxSize=5000000000
+druid_auth_basic_common_cacheDirectory=/tmp/authCache/historical-query-retry-test
+druid_server_https_crlPath=/tls/revocations.crl
diff --git a/integration-tests/docker/test-data/query-retry-sample-data.sql
b/integration-tests/docker/test-data/query-retry-sample-data.sql
new file mode 100644
index 0000000..18ab48a
--- /dev/null
+++ b/integration-tests/docker/test-data/query-retry-sample-data.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES
('twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9','twitterstream','2013-05-13T01:08:18.192Z','2013-01-01T00:00:00.000Z','2013-01-02T00:00:00.000Z',0,'2013-01-02T04:13:41.980Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z\",\"version\":\"2013-01-02T04:13:41.980Z_v9\",\"loadSpec\":{
[...]
+INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES
('twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9','twitterstream','2013-05-13T00:03:28.640Z','2013-01-02T00:00:00.000Z','2013-01-03T00:00:00.000Z',0,'2013-01-03T03:44:58.791Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z\",\"version\":\"2013-01-03T03:44:58.791Z_v9\",\"loadSpec\":{
[...]
+INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES
('twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9','twitterstream','2013-05-13T00:03:48.807Z','2013-01-03T00:00:00.000Z','2013-01-04T00:00:00.000Z',0,'2013-01-04T04:09:13.590Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z\",\"version\":\"2013-01-04T04:09:13.590Z_v9\",\"loadSpec\":{
[...]
+INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES
('wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','wikipedia_editstream','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"wikipedia_editstream\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830
[...]
+INSERT INTO druid_segments (id, dataSource, created_date, start, end,
partitioned, version, used, payload) VALUES
('wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z',
'wikipedia', '2013-08-08T21:26:23.799Z', '2013-08-01T00:00:00.000Z',
'2013-08-02T00:00:00.000Z', '0', '2013-08-08T21:22:48.989Z', '1',
'{\"dataSource\":\"wikipedia\",\"interval\":\"2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z\",\"version\":\"2013-08-08T21:22:48.989Z\",\"loadSpec\":{\
[...]
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 4d64c66..4fee910 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -46,6 +46,14 @@
<artifactId>opencsv</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>airline</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws.sdk.version}</version>
@@ -195,7 +203,6 @@
<groupId>org.apache.druid</groupId>
<artifactId>druid-services</artifactId>
<version>${project.parent.version}</version>
- <scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
diff --git a/integration-tests/script/copy_resources.sh
b/integration-tests/script/copy_resources.sh
index 0324a69..6ecd307 100755
--- a/integration-tests/script/copy_resources.sh
+++ b/integration-tests/script/copy_resources.sh
@@ -32,7 +32,17 @@ rm -rf $SHARED_DIR/docker
cp -R docker $SHARED_DIR/docker
mvn -B dependency:copy-dependencies -DoutputDirectory=$SHARED_DIR/docker/lib
+# install logging config
+cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
+
+# copy the integration test jar, it provides test-only extension
implementations
+cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
+
# move extensions into a seperate extension folder
+# For druid-integration-tests
+mkdir -p $SHARED_DIR/docker/extensions/druid-integration-tests
+# We don't want to copy tests jar.
+cp $SHARED_DIR/docker/lib/druid-integration-tests-*[^s].jar
$SHARED_DIR/docker/extensions/druid-integration-tests
# For druid-s3-extensions
mkdir -p $SHARED_DIR/docker/extensions/druid-s3-extensions
mv $SHARED_DIR/docker/lib/druid-s3-extensions-*
$SHARED_DIR/docker/extensions/druid-s3-extensions
@@ -64,12 +74,6 @@ then
curl
https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar
--output $SHARED_DIR/docker/lib/gcs-connector-hadoop2-latest.jar
fi
-# install logging config
-cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
-
-# copy the integration test jar, it provides test-only extension
implementations
-cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
-
# one of the integration tests needs the wikiticker sample data
mkdir -p $SHARED_DIR/wikiticker-it
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz
$SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
diff --git a/integration-tests/script/docker_run_cluster.sh
b/integration-tests/script/docker_run_cluster.sh
index 8f3bd92..c5faa4a 100755
--- a/integration-tests/script/docker_run_cluster.sh
+++ b/integration-tests/script/docker_run_cluster.sh
@@ -49,11 +49,16 @@ fi
then
if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
then
- # Start default Druid services and additional druid router
(custom-check-tls, permissive-tls, no-client-auth-tls)
- docker-compose -f ${DOCKERDIR}/docker-compose.yml -f
${DOCKERDIR}/docker-compose.security.yml up -d
+ # Start default Druid services and additional druid router
(custom-check-tls, permissive-tls, no-client-auth-tls)
+ docker-compose -f ${DOCKERDIR}/docker-compose.yml -f
${DOCKERDIR}/docker-compose.security.yml up -d
+ elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
+ then
+ # Start default Druid services with an additional historical modified
for query retry test
+ # See CliHistoricalForQueryRetryTest.
+ docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d
else
- # Start default Druid services
- docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
+ # Start default Druid services
+ docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
fi
else
# run druid cluster with override config
diff --git
a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java
b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java
new file mode 100644
index 0000000..6663e99
--- /dev/null
+++
b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.cli;
+
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import io.airlift.airline.Command;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
+
+import java.util.Properties;
+
+@Command(
+ name = "historical-for-query-retry-test",
+ description = "Runs a Historical node modified for query retry test"
+)
+public class CliHistoricalForQueryRetryTest extends CliHistorical
+{
+ private static final Logger log = new
Logger(CliHistoricalForQueryRetryTest.class);
+
+ public CliHistoricalForQueryRetryTest()
+ {
+ super();
+ }
+
+ @Inject
+ public void configure(Properties properties)
+ {
+ log.info("Historical is configured for testing query retry on missing
segments");
+ }
+
+ @Override
+ public void bindQuerySegmentWalker(Binder binder)
+ {
+
binder.bind(QuerySegmentWalker.class).to(ServerManagerForQueryRetryTest.class).in(LazySingleton.class);
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
b/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java
similarity index 76%
copy from core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
copy to
integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java
index dc314a7..9635c5a 100644
--- a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
+++
b/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java
@@ -19,13 +19,13 @@
package org.apache.druid.cli;
-import io.airlift.airline.Cli;
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import io.airlift.airline.Cli.CliBuilder;
-/**
- */
-@ExtensionPoint
-public interface CliCommandCreator
+public class QueryRetryTestCommandCreator implements CliCommandCreator
{
- void addCommands(Cli.CliBuilder builder);
+ @Override
+ public void addCommands(CliBuilder builder)
+ {
+
builder.withGroup("server").withCommands(CliHistoricalForQueryRetryTest.class);
+ }
}
diff --git
a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java
b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java
new file mode 100644
index 0000000..b81b165
--- /dev/null
+++
b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.guice.annotations.Processing;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+/**
+ * This server manager is designed to test query retry on missing segments. A
segment can be missing during a query
+ * if a historical drops the segment after the broker issues the query to the
historical. To mimic this situation,
+ * the historical with this server manager announces all segments assigned,
but reports missing segments for the
+ * first 3 segments specified in the query.
+ *
+ * @see org.apache.druid.query.RetryQueryRunner
+ */
+public class ServerManagerForQueryRetryTest extends ServerManager
+{
+ // Query context key that indicates this query is for query retry testing.
+ public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test";
+
+ private static final Logger LOG = new
Logger(ServerManagerForQueryRetryTest.class);
+ private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3;
+
+ private final ConcurrentHashMap<String, Set<SegmentDescriptor>>
queryToIgnoredSegments = new ConcurrentHashMap<>();
+
+ @Inject
+ public ServerManagerForQueryRetryTest(
+ QueryRunnerFactoryConglomerate conglomerate,
+ ServiceEmitter emitter,
+ @Processing ExecutorService exec,
+ CachePopulator cachePopulator,
+ @Smile ObjectMapper objectMapper,
+ Cache cache,
+ CacheConfig cacheConfig,
+ SegmentManager segmentManager,
+ JoinableFactory joinableFactory,
+ ServerConfig serverConfig
+ )
+ {
+ super(
+ conglomerate,
+ emitter,
+ exec,
+ cachePopulator,
+ objectMapper,
+ cache,
+ cacheConfig,
+ segmentManager,
+ joinableFactory,
+ serverConfig
+ );
+ }
+
+ @Override
+ <T> QueryRunner<T> buildQueryRunnerForSegment(
+ Query<T> query,
+ SegmentDescriptor descriptor,
+ QueryRunnerFactory<T, Query<T>> factory,
+ QueryToolChest<T, Query<T>> toolChest,
+ VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
+ Function<SegmentReference, SegmentReference> segmentMapFn,
+ AtomicLong cpuTimeAccumulator
+ )
+ {
+ if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
+ final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
+ queryToIgnoredSegments.compute(
+ query.getMostSpecificId(),
+ (queryId, ignoredSegments) -> {
+ if (ignoredSegments == null) {
+ ignoredSegments = new HashSet<>();
+ }
+ if (ignoredSegments.size() <
MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
+ ignoredSegments.add(descriptor);
+ isIgnoreSegment.setTrue();
+ }
+ return ignoredSegments;
+ }
+ );
+
+ if (isIgnoreSegment.isTrue()) {
+ LOG.info("Pretending I don't have segment[%s]", descriptor);
+ return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+ }
+ }
+ return super.buildQueryRunnerForSegment(
+ query,
+ descriptor,
+ factory,
+ toolChest,
+ timeline,
+ segmentMapFn,
+ cpuTimeAccumulator
+ );
+ }
+}
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
index 9f39bd5..13476bd 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
@@ -20,6 +20,7 @@
package org.apache.druid.testing.utils;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.Query;
import java.util.List;
@@ -27,14 +28,12 @@ import java.util.Map;
public class QueryWithResults extends AbstractQueryWithResults<Query>
{
-
@JsonCreator
public QueryWithResults(
- Query query,
- List<Map<String, Object>> expectedResults
+ @JsonProperty("query") Query query,
+ @JsonProperty("expectedResults") List<Map<String, Object>>
expectedResults
)
{
super(query, expectedResults);
}
-
}
diff --git a/integration-tests/stop_cluster.sh
b/integration-tests/src/main/resources/META-INF/services/org.apache.druid.cli.CliCommandCreator
old mode 100755
new mode 100644
similarity index 58%
copy from integration-tests/stop_cluster.sh
copy to
integration-tests/src/main/resources/META-INF/services/org.apache.druid.cli.CliCommandCreator
index d75e73f..70c37f6
--- a/integration-tests/stop_cluster.sh
+++
b/integration-tests/src/main/resources/META-INF/services/org.apache.druid.cli.CliCommandCreator
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -14,16 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# Skip stopping docker if flag set (For use during development)
-if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [
"$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" == true ]
- then
- exit 0
- fi
-
-for node in druid-historical druid-coordinator druid-overlord druid-router
druid-router-permissive-tls druid-router-no-client-auth-tls
druid-router-custom-check-tls druid-broker druid-middlemanager
druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
-do
- docker stop $node
- docker rm $node
-done
-
-docker network rm druid-it-net
+org.apache.druid.cli.QueryRetryTestCommandCreator
\ No newline at end of file
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index d759ff2..01d1e91 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -52,6 +52,8 @@ public class TestNGGroup
*/
public static final String QUERY = "query";
+ public static final String QUERY_RETRY = "query-retry";
+
public static final String REALTIME_INDEX = "realtime-index";
/**
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
new file mode 100644
index 0000000..817ee0b
--- /dev/null
+++
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.QueryResourceTestClient;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.QueryResultVerifier;
+import org.apache.druid.testing.utils.QueryWithResults;
+import org.apache.druid.testing.utils.TestQueryHelper;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractIndexerTest;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class tests the query retry on missing segments. A segment can be
missing in a historical during a query if
+ * the historical drops the segment after the broker issues the query to the
historical. To mimic this case, this
+ * test spawns two historicals, a normal historical and a historical modified
for testing. The later historical
+ * announces all segments assigned, but doesn't serve all of them. Instead, it
can report missing segments for some
+ * segments. See {@link ServerManagerForQueryRetryTest} for more details.
+ *
+ * To run this test properly, the test group must be specified as {@link
TestNGGroup#QUERY_RETRY}.
+ */
+@Test(groups = TestNGGroup.QUERY_RETRY)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITQueryRetryTestOnMissingSegments
+{
+ private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
+ private static final String QUERIES_RESOURCE =
"/queries/wikipedia_editstream_queries_query_retry_test.json";
+ private static final int TIMES_TO_RUN = 50;
+
+ /**
+ * This test runs the same query multiple times. This enumeration represents
an expectation after finishing
+ * running the query.
+ */
+ private enum Expectation
+ {
+ /**
+ * Expect that all runs succeed.
+ */
+ ALL_SUCCESS,
+ /**
+ * Expect that all runs returns the 200 HTTP response, but some of them
can return incorrect result.
+ */
+ INCORRECT_RESULT,
+ /**
+ * Expect that some runs can return the 500 HTTP response. For the runs
returned the 200 HTTP response, the query
+ * result must be correct.
+ */
+ QUERY_FAILURE
+ }
+
+ @Inject
+ private CoordinatorResourceTestClient coordinatorClient;
+ @Inject
+ private TestQueryHelper queryHelper;
+ @Inject
+ private QueryResourceTestClient queryClient;
+ @Inject
+ private IntegrationTestingConfig config;
+ @Inject
+ private ObjectMapper jsonMapper;
+
+ @BeforeMethod
+ public void before()
+ {
+ // ensure that wikipedia segments are loaded completely
+ ITRetryUtil.retryUntilTrue(
+ () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE),
"wikipedia segment load"
+ );
+ }
+
+ @Test
+ public void testWithRetriesDisabledPartialResultDisallowed() throws Exception
+ {
+ // Since retry is disabled and partial result is not allowed, we can
expect some queries can fail.
+ // If a query succeed, its result must be correct.
+ testQueries(buildQuery(0, false), Expectation.QUERY_FAILURE);
+ }
+
+ @Test
+ public void testWithRetriesDisabledPartialResultAllowed() throws Exception
+ {
+ // Since retry is disabled but partial result is allowed, all queries must
succeed.
+ // However, some queries can return incorrect result.
+ testQueries(buildQuery(0, true), Expectation.INCORRECT_RESULT);
+ }
+
+ @Test
+ public void testWithRetriesEnabledPartialResultDisallowed() throws Exception
+ {
+ // Since retry is enabled, all queries must succeed even though partial
result is disallowed.
+ // All queries must return correct result.
+ testQueries(buildQuery(30, false), Expectation.ALL_SUCCESS);
+ }
+
+ private void testQueries(String queryWithResultsStr, Expectation
expectation) throws Exception
+ {
+ final List<QueryWithResults> queries = jsonMapper.readValue(
+ queryWithResultsStr,
+ new TypeReference<List<QueryWithResults>>() {}
+ );
+ testQueries(queries, expectation);
+ }
+
+ private void testQueries(List<QueryWithResults> queries, Expectation
expectation) throws Exception
+ {
+ int querySuccess = 0;
+ int queryFailure = 0;
+ int resultMatches = 0;
+ int resultMismatches = 0;
+ for (int i = 0; i < TIMES_TO_RUN; i++) {
+ for (QueryWithResults queryWithResult : queries) {
+ final StatusResponseHolder responseHolder = queryClient
+ .queryAsync(queryHelper.getQueryURL(config.getBrokerUrl()),
queryWithResult.getQuery())
+ .get();
+
+ if (responseHolder.getStatus().getCode() ==
HttpResponseStatus.OK.getCode()) {
+ querySuccess++;
+
+ List<Map<String, Object>> result = jsonMapper.readValue(
+ responseHolder.getContent(),
+ new TypeReference<List<Map<String, Object>>>() {}
+ );
+ if (!QueryResultVerifier.compareResults(result,
queryWithResult.getExpectedResults())) {
+ if (expectation != Expectation.INCORRECT_RESULT) {
+ throw new ISE(
+ "Incorrect query results for query %s \n expectedResults: %s
\n actualResults : %s",
+ queryWithResult.getQuery(),
+
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
+ jsonMapper.writeValueAsString(result)
+ );
+ } else {
+ resultMismatches++;
+ }
+ } else {
+ resultMatches++;
+ }
+ } else if (responseHolder.getStatus().getCode() ==
HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode() &&
+ expectation == Expectation.QUERY_FAILURE) {
+ final Map<String, Object> response =
jsonMapper.readValue(responseHolder.getContent(), Map.class);
+ final String errorMessage = (String) response.get("errorMessage");
+ Assert.assertNotNull(errorMessage, "errorMessage");
+ Assert.assertTrue(errorMessage.contains("No results found for
segments"));
+ queryFailure++;
+ } else {
+ throw new ISE(
+ "Unexpected failure, code: [%s], content: [%s]",
+ responseHolder.getStatus(),
+ responseHolder.getContent()
+ );
+ }
+ }
+ }
+
+ switch (expectation) {
+ case ALL_SUCCESS:
+ Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN,
querySuccess);
+ Assert.assertEquals(0, queryFailure);
+ Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN,
resultMatches);
+ Assert.assertEquals(0, resultMismatches);
+ break;
+ case QUERY_FAILURE:
+ Assert.assertTrue(querySuccess > 0, "At least one query is expected to
succeed.");
+ Assert.assertTrue(queryFailure > 0, "At least one query is expected to
fail.");
+ Assert.assertEquals(querySuccess, resultMatches);
+ Assert.assertEquals(0, resultMismatches);
+ break;
+ case INCORRECT_RESULT:
+ Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN,
querySuccess);
+ Assert.assertEquals(0, queryFailure);
+ Assert.assertTrue(resultMatches > 0, "At least one query is expected
to return correct results.");
+ Assert.assertTrue(resultMismatches > 0, "At least one query is
expected to return less results.");
+ break;
+ default:
+ throw new ISE("Unknown expectation[%s]", expectation);
+ }
+ }
+
+ private String buildQuery(int numRetriesOnMissingSegments, boolean
allowPartialResults) throws IOException
+ {
+ return StringUtils.replace(
+ AbstractIndexerTest.getResourceAsString(QUERIES_RESOURCE),
+ "%%CONTEXT%%",
+
jsonMapper.writeValueAsString(buildContext(numRetriesOnMissingSegments,
allowPartialResults))
+ );
+ }
+
+ private static Map<String, Object> buildContext(int
numRetriesOnMissingSegments, boolean allowPartialResults)
+ {
+ final Map<String, Object> context = new HashMap<>();
+ // Disable cache so that each run hits historical.
+ context.put(QueryContexts.USE_CACHE_KEY, false);
+ context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY,
numRetriesOnMissingSegments);
+ context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
+ context.put(ServerManagerForQueryRetryTest.QUERY_RETRY_TEST_CONTEXT_KEY,
true);
+ return context;
+ }
+}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
index 389d33e..858b72d 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
@@ -64,7 +64,6 @@ public class ITWikipediaQueryTest
@BeforeMethod
public void before() throws Exception
{
-
// ensure that wikipedia segments are loaded completely
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE),
"wikipedia segment load"
diff --git
a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
new file mode 100644
index 0000000..0886cf4
--- /dev/null
+++
b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
@@ -0,0 +1,26 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query": {
+ "queryType": "timeseries",
+ "dataSource": "wikipedia_editstream",
+ "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
+ "granularity": "all",
+ "aggregations": [
+ {
+ "type": "count",
+ "name": "rows"
+ }
+ ],
+ "context": %%CONTEXT%%
+ },
+ "expectedResults": [
+ {
+ "timestamp": "2013-01-01T00:00:00.000Z",
+ "result": {
+ "rows": 2390950
+ }
+ }
+ ]
+ }
+]
diff --git a/integration-tests/stop_cluster.sh
b/integration-tests/stop_cluster.sh
index d75e73f..92f9c41 100755
--- a/integration-tests/stop_cluster.sh
+++ b/integration-tests/stop_cluster.sh
@@ -20,7 +20,7 @@ if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [
"$DRUID_INTEGRATION_TES
exit 0
fi
-for node in druid-historical druid-coordinator druid-overlord druid-router
druid-router-permissive-tls druid-router-no-client-auth-tls
druid-router-custom-check-tls druid-broker druid-middlemanager
druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
+for node in druid-historical druid-historical-for-query-retry-test
druid-coordinator druid-overlord druid-router druid-router-permissive-tls
druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker
druid-middlemanager druid-zookeeper-kafka druid-metadata-storage
druid-it-hadoop;
do
docker stop $node
docker rm $node
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 3667593..b70e8cf 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -53,6 +53,9 @@ public class QueryContexts
public static final String
JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY =
"enableJoinFilterRewriteValueColumnFilters";
public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY =
"joinFilterRewriteMaxSize";
public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
+ public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY =
"numRetriesOnMissingSegments";
+ public static final String RETURN_PARTIAL_RESULTS_KEY =
"returnPartialResults";
+ public static final String USE_CACHE_KEY = "useCache";
public static final boolean DEFAULT_BY_SEGMENT = false;
public static final boolean DEFAULT_POPULATE_CACHE = true;
@@ -143,7 +146,7 @@ public class QueryContexts
public static <T> boolean isUseCache(Query<T> query, boolean defaultValue)
{
- return parseBoolean(query, "useCache", defaultValue);
+ return parseBoolean(query, USE_CACHE_KEY, defaultValue);
}
public static <T> boolean isPopulateResultLevelCache(Query<T> query)
@@ -344,6 +347,16 @@ public class QueryContexts
return defaultTimeout;
}
+ public static <T> int getNumRetriesOnMissingSegments(Query<T> query, int
defaultValue)
+ {
+ return query.getContextValue(NUM_RETRIES_ON_MISSING_SEGMENTS_KEY,
defaultValue);
+ }
+
+ public static <T> boolean allowReturnPartialResults(Query<T> query, boolean
defaultValue)
+ {
+ return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
+ }
+
static <T> long parseLong(Query<T> query, String key, long defaultValue)
{
final Object val = query.getContextValue(key);
diff --git
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 2718282..9b4c94b 100644
---
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
@@ -138,8 +139,17 @@ public class TimeseriesQueryQueryToolChest extends
QueryToolChest<Result<Timeser
final Sequence<Result<TimeseriesResultValue>> finalSequence;
- if (query.getGranularity().equals(Granularities.ALL) &&
!query.isSkipEmptyBuckets()) {
- //Usally it is NOT Okay to materialize results via toList(), but
Granularity is ALL thus we have only one record
+ // When granularity = ALL, there is no grouping key for this query.
+ // To be more sql-compliant, we should return something (e.g., 0 for
count queries) even when
+ // the sequence is empty.
+ if (query.getGranularity().equals(Granularities.ALL) &&
+ // Returns empty sequence if this query allows skipping empty buckets
+ !query.isSkipEmptyBuckets() &&
+ // Returns empty sequence if bySegment is set because bySegment
results are mostly used for
+ // caching in historicals or debugging where the exact results are
preferred.
+ !QueryContexts.isBySegment(query)) {
+ // Usally it is NOT Okay to materialize results via toList(), but
Granularity is ALL thus
+ // we have only one record.
final List<Result<TimeseriesResultValue>> val = baseResults.toList();
finalSequence = val.isEmpty() ?
Sequences.simple(Collections.singletonList(
getNullTimeseriesResultValue(query))) : Sequences.simple(val);
diff --git a/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
index f18764f..8823c07 100644
--- a/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
@@ -214,10 +214,14 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
return true;
} else {
final List<SegmentDescriptor> missingSegments =
getMissingSegments(queryPlus, context);
+ final int maxNumRetries = QueryContexts.getNumRetriesOnMissingSegments(
+ queryPlus.getQuery(),
+ config.getNumTries()
+ );
if (missingSegments.isEmpty()) {
return false;
- } else if (retryCount >= config.getNumTries()) {
- if (!config.isReturnPartialResults()) {
+ } else if (retryCount >= maxNumRetries) {
+ if (!QueryContexts.allowReturnPartialResults(queryPlus.getQuery(),
config.isReturnPartialResults())) {
throw new SegmentMissingException("No results found for
segments[%s]", missingSegments);
} else {
return false;
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index 79c7d94..222a0dd 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -204,35 +204,20 @@ public class ServerManager implements QuerySegmentWalker
analysis.getBaseQuery().orElse(query)
);
- FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
+ final FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs)
.transformCat(
- descriptor -> {
- final PartitionHolder<ReferenceCountingSegment> entry =
timeline.findEntry(
- descriptor.getInterval(),
- descriptor.getVersion()
- );
-
- if (entry == null) {
- return Collections.singletonList(new
ReportTimelineMissingSegmentQueryRunner<>(descriptor));
- }
-
- final PartitionChunk<ReferenceCountingSegment> chunk =
entry.getChunk(descriptor.getPartitionNumber());
- if (chunk == null) {
- return Collections.singletonList(new
ReportTimelineMissingSegmentQueryRunner<>(descriptor));
- }
-
- final ReferenceCountingSegment segment = chunk.getObject();
- return Collections.singletonList(
- buildAndDecorateQueryRunner(
- factory,
- toolChest,
- segmentMapFn.apply(segment),
- descriptor,
- cpuTimeAccumulator
- )
- );
- }
+ descriptor -> Collections.singletonList(
+ buildQueryRunnerForSegment(
+ query,
+ descriptor,
+ factory,
+ toolChest,
+ timeline,
+ segmentMapFn,
+ cpuTimeAccumulator
+ )
+ )
);
return CPUTimeMetricQueryRunner.safeBuild(
@@ -247,6 +232,40 @@ public class ServerManager implements QuerySegmentWalker
);
}
+ <T> QueryRunner<T> buildQueryRunnerForSegment(
+ final Query<T> query,
+ final SegmentDescriptor descriptor,
+ final QueryRunnerFactory<T, Query<T>> factory,
+ final QueryToolChest<T, Query<T>> toolChest,
+ final VersionedIntervalTimeline<String, ReferenceCountingSegment>
timeline,
+ final Function<SegmentReference, SegmentReference> segmentMapFn,
+ final AtomicLong cpuTimeAccumulator
+ )
+ {
+ final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
+ descriptor.getInterval(),
+ descriptor.getVersion()
+ );
+
+ if (entry == null) {
+ return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+ }
+
+ final PartitionChunk<ReferenceCountingSegment> chunk =
entry.getChunk(descriptor.getPartitionNumber());
+ if (chunk == null) {
+ return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+ }
+
+ final ReferenceCountingSegment segment = chunk.getObject();
+ return buildAndDecorateQueryRunner(
+ factory,
+ toolChest,
+ segmentMapFn.apply(segment),
+ descriptor,
+ cpuTimeAccumulator
+ );
+ }
+
private <T> QueryRunner<T> buildAndDecorateQueryRunner(
final QueryRunnerFactory<T, Query<T>> factory,
final QueryToolChest<T, Query<T>> toolChest,
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index d2c15c1..d0c457a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -19,7 +19,9 @@
package org.apache.druid.cli;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
@@ -91,7 +93,7 @@ public class CliHistorical extends ServerRunnable
binder.bind(ServerManager.class).in(LazySingleton.class);
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
-
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
+ bindQuerySegmentWalker(binder);
binder.bind(ServerTypeConfig.class).toInstance(new
ServerTypeConfig(ServerType.HISTORICAL));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
@@ -119,4 +121,13 @@ public class CliHistorical extends ServerRunnable
new LookupModule()
);
}
+
+ /**
+ * This method is visible for testing query retry on missing segments. See
{@link CliHistoricalForQueryRetryTest}.
+ */
+ @VisibleForTesting
+ public void bindQuerySegmentWalker(Binder binder)
+ {
+
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]