This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fdce36  Add integration tests for query retry on missing segments 
(#10171)
6fdce36 is described below

commit 6fdce36e41b109f74077c0a68fc2d2ec5d771e29
Author: Jihoon Son <[email protected]>
AuthorDate: Wed Jul 22 22:30:35 2020 -0700

    Add integration tests for query retry on missing segments (#10171)
    
    * Add integration tests for query retry on missing segments
    
    * add missing dependencies; fix travis conf
    
    * address comments
    
    * Integration tests extension
    
    * remove unused dependency
    
    * remove druid_main
    
    * fix java agent port
---
 .travis.yml                                        |  19 +-
 .../org/apache/druid/cli/CliCommandCreator.java    |   5 +
 .../apache/druid/indexing/common/task/Tasks.java   |   2 +-
 .../docker/docker-compose.query-retry-test.yml     | 128 +++++++++++
 integration-tests/docker/druid.sh                  |   9 +-
 .../historical-for-query-retry-test                |  33 +++
 .../docker/test-data/query-retry-sample-data.sql   |  20 ++
 integration-tests/pom.xml                          |   9 +-
 integration-tests/script/copy_resources.sh         |  16 +-
 integration-tests/script/docker_run_cluster.sh     |  13 +-
 .../druid/cli/CliHistoricalForQueryRetryTest.java  |  56 +++++
 .../druid/cli/QueryRetryTestCommandCreator.java    |  14 +-
 .../ServerManagerForQueryRetryTest.java            | 141 +++++++++++++
 .../druid/testing/utils/QueryWithResults.java      |   7 +-
 .../org.apache.druid.cli.CliCommandCreator}        |  15 +-
 .../java/org/apache/druid/tests/TestNGGroup.java   |   2 +
 .../query/ITQueryRetryTestOnMissingSegments.java   | 235 +++++++++++++++++++++
 .../druid/tests/query/ITWikipediaQueryTest.java    |   1 -
 ...ipedia_editstream_queries_query_retry_test.json |  26 +++
 integration-tests/stop_cluster.sh                  |   2 +-
 .../java/org/apache/druid/query/QueryContexts.java |  15 +-
 .../timeseries/TimeseriesQueryQueryToolChest.java  |  14 +-
 .../org/apache/druid/query/RetryQueryRunner.java   |   8 +-
 .../druid/server/coordination/ServerManager.java   |  73 ++++---
 .../java/org/apache/druid/cli/CliHistorical.java   |  13 +-
 25 files changed, 799 insertions(+), 77 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 6378c72..4bb73bc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -246,7 +246,7 @@ jobs:
       <<: *test_processing_module
       name: "(openjdk8) other modules test"
       env:
-        - 
MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console'
+        - 
MAVEN_PROJECTS='!processing,!indexing-hadoop,!indexing-service,!extensions-core/kafka-indexing-service,!extensions-core/kinesis-indexing-service,!server,!web-console,!integration-tests'
 
     - <<: *test_other_modules
       name: "(openjdk11) other modules test"
@@ -385,6 +385,14 @@ jobs:
       script: *run_integration_test
       after_failure: *integration_test_diags
 
+    - &integration_query_retry
+      name: "(Compile=openjdk8, Run=openjdk8) query retry integration test for 
missing segments"
+      jdk: openjdk8
+      services: *integration_test_services
+      env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=8'
+      script: *run_integration_test
+      after_failure: *integration_test_diags
+
     - &integration_security
       name: "(Compile=openjdk8, Run=openjdk8) security integration test"
       jdk: openjdk8
@@ -413,7 +421,7 @@ jobs:
       name: "(Compile=openjdk8, Run=openjdk8) other integration test"
       jdk: openjdk8
       services: *integration_test_services
-      env: 
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-sto
 [...]
+      env: 
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-h
 [...]
       script: *run_integration_test
       after_failure: *integration_test_diags
     # END - Integration tests for Compile with Java 8 and Run with Java 8
@@ -444,6 +452,11 @@ jobs:
       jdk: openjdk8
       env: TESTNG_GROUPS='-Dgroups=query' JVM_RUNTIME='-Djvm.runtime=11'
 
+    - <<: *integration_query_retry
+      name: "(Compile=openjdk8, Run=openjdk11) query retry integration test 
for missing segments"
+      jdk: openjdk8
+      env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11'
+
     - <<: *integration_security
       name: "(Compile=openjdk8, Run=openjdk11) security integration test"
       jdk: openjdk8
@@ -462,7 +475,7 @@ jobs:
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk11) other integration test"
       jdk: openjdk8
-      env: 
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-sto
 [...]
+      env: 
TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-h
 [...]
     # END - Integration tests for Compile with Java 8 and Run with Java 11
 
     - name: "security vulnerabilities"
diff --git a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java 
b/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
index dc314a7..f554d16 100644
--- a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
+++ b/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
@@ -23,6 +23,11 @@ import io.airlift.airline.Cli;
 import org.apache.druid.guice.annotations.ExtensionPoint;
 
 /**
+ * An extension point to create a custom Druid service. Druid can understand 
and execute custom commands
+ * to run services loaded via Druid's extension system (see {@code 
Initialization#getFromExtensions}). See
+ * the {@code Main} class for details of groups and commands.
+ *
+ * Implementations should be registered in the {@code 
META-INF/services/org.apache.druid.cli.CliCommandCreator} file.
  */
 @ExtensionPoint
 public interface CliCommandCreator
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index e16e52e..502eb85 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -51,7 +51,7 @@ public class Tasks
   public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
   public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
   /**
-   *This context is used in auto compaction. When it is set in the context, 
the segments created by the task
+   * This context is used in auto compaction. When it is set in the context, 
the segments created by the task
    * will fill 'lastCompactionState' in its metadata. This will be used to 
track what segments are compacted or not.
    * See {@link org.apache.druid.timeline.DataSegment} and {@link
    * org.apache.druid.server.coordinator.duty.NewestSegmentFirstIterator} for 
more details.
diff --git a/integration-tests/docker/docker-compose.query-retry-test.yml 
b/integration-tests/docker/docker-compose.query-retry-test.yml
new file mode 100644
index 0000000..9b5a5a6
--- /dev/null
+++ b/integration-tests/docker/docker-compose.query-retry-test.yml
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "2.2"
+services:
+  druid-zookeeper-kafka:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-zookeeper-kafka
+
+  druid-metadata-storage:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-metadata-storage
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-zookeeper-kafka
+
+  druid-overlord:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-overlord
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    links:
+      - druid-metadata-storage:druid-metadata-storage
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+    depends_on:
+      - druid-metadata-storage
+      - druid-zookeeper-kafka
+
+  druid-coordinator:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-coordinator
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    links:
+      - druid-overlord:druid-overlord
+      - druid-metadata-storage:druid-metadata-storage
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+    depends_on:
+      - druid-overlord
+      - druid-metadata-storage
+      - druid-zookeeper-kafka
+
+  druid-historical:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-historical
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    links:
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+    depends_on:
+      - druid-zookeeper-kafka
+
+  druid-broker:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-broker
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    links:
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+      - druid-historical:druid-historical
+    depends_on:
+      - druid-zookeeper-kafka
+      - druid-historical
+
+  druid-router:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-router
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    links:
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+      - druid-coordinator:druid-coordinator
+      - druid-broker:druid-broker
+    depends_on:
+      - druid-zookeeper-kafka
+      - druid-coordinator
+      - druid-broker
+
+  druid-historical-for-query-retry-test:
+    image: druid/cluster
+    container_name: druid-historical-for-query-retry-test
+    networks:
+      druid-it-net:
+        ipv4_address: 172.172.172.13
+    ports:
+      - 8084:8083
+      - 8284:8283
+      - 5010:5007
+    privileged: true
+    volumes:
+      - ${HOME}/shared:/shared
+      - ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf
+    env_file:
+      - ./environment-configs/common
+      - ./environment-configs/historical-for-query-retry-test
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    links:
+      - druid-zookeeper-kafka:druid-zookeeper-kafka
+    depends_on:
+      - druid-zookeeper-kafka
+
+networks:
+  druid-it-net:
+    name: druid-it-net
+    ipam:
+      config:
+        - subnet: 172.172.172.0/24
\ No newline at end of file
diff --git a/integration-tests/docker/druid.sh 
b/integration-tests/docker/druid.sh
index a17df11..4896102 100755
--- a/integration-tests/docker/druid.sh
+++ b/integration-tests/docker/druid.sh
@@ -23,6 +23,7 @@ getConfPath()
     case "$1" in
     _common) echo $cluster_conf_base/_common ;;
     historical) echo $cluster_conf_base/data/historical ;;
+    historical-for-query-retry-test) echo $cluster_conf_base/data/historical ;;
     middleManager) echo $cluster_conf_base/data/middleManager ;;
     coordinator) echo $cluster_conf_base/master/coordinator ;;
     broker) echo $cluster_conf_base/query/broker ;;
@@ -82,14 +83,18 @@ setupData()
   # The "query" and "security" test groups require data to be setup before 
running the tests.
   # In particular, they requires segments to be download from a pre-existing 
s3 bucket.
   # This is done by using the loadSpec put into metadatastore and s3 
credientials set below.
-  if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ 
"$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
+  if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ 
"$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ 
"$DRUID_INTEGRATION_TEST_GROUP" = "security" ]; then
     # touch is needed because OverlayFS's copy-up operation breaks POSIX 
standards. See https://github.com/docker/for-linux/issues/72.
     find /var/lib/mysql -type f -exec touch {} \; && service mysql start \
       && cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | 
mysql -u root druid && /etc/init.d/mysql stop
     # below s3 credentials needed to access the pre-existing s3 bucket
     setKey $DRUID_SERVICE druid.s3.accessKey AKIAJI7DG7CDECGBQ6NA
     setKey $DRUID_SERVICE druid.s3.secretKey 
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
-    setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
+    if [[ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]]; then
+      setKey $DRUID_SERVICE druid.extensions.loadList 
[\"druid-s3-extensions\",\"druid-integration-tests\"]
+    else
+      setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"]
+    fi
     # The region of the sample data s3 blobs needed for these test groups
     export AWS_REGION=us-east-1
   fi
diff --git 
a/integration-tests/docker/environment-configs/historical-for-query-retry-test 
b/integration-tests/docker/environment-configs/historical-for-query-retry-test
new file mode 100644
index 0000000..7cbe6d9
--- /dev/null
+++ 
b/integration-tests/docker/environment-configs/historical-for-query-retry-test
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+DRUID_SERVICE=historical-for-query-retry-test
+DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log
+
+# JAVA OPTS
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m 
-XX:MaxNewSize=256m -XX:+UseG1GC 
-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010
+
+# Druid configs
+druid_processing_buffer_sizeBytes=25000000
+druid_processing_numThreads=2
+druid_query_groupBy_maxOnDiskStorage=300000000
+druid_segmentCache_locations=[{"path":"/shared/druid/indexCache-query-retry-test","maxSize":5000000000}]
+druid_server_maxSize=5000000000
+druid_auth_basic_common_cacheDirectory=/tmp/authCache/historical-query-retry-test
+druid_server_https_crlPath=/tls/revocations.crl
diff --git a/integration-tests/docker/test-data/query-retry-sample-data.sql 
b/integration-tests/docker/test-data/query-retry-sample-data.sql
new file mode 100644
index 0000000..18ab48a
--- /dev/null
+++ b/integration-tests/docker/test-data/query-retry-sample-data.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES 
('twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9','twitterstream','2013-05-13T01:08:18.192Z','2013-01-01T00:00:00.000Z','2013-01-02T00:00:00.000Z',0,'2013-01-02T04:13:41.980Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z\",\"version\":\"2013-01-02T04:13:41.980Z_v9\",\"loadSpec\":{
 [...]
+INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES 
('twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9','twitterstream','2013-05-13T00:03:28.640Z','2013-01-02T00:00:00.000Z','2013-01-03T00:00:00.000Z',0,'2013-01-03T03:44:58.791Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z\",\"version\":\"2013-01-03T03:44:58.791Z_v9\",\"loadSpec\":{
 [...]
+INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES 
('twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9','twitterstream','2013-05-13T00:03:48.807Z','2013-01-03T00:00:00.000Z','2013-01-04T00:00:00.000Z',0,'2013-01-04T04:09:13.590Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z\",\"version\":\"2013-01-04T04:09:13.590Z_v9\",\"loadSpec\":{
 [...]
+INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES 
('wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','wikipedia_editstream','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"wikipedia_editstream\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830
 [...]
+INSERT INTO druid_segments (id, dataSource, created_date, start, end, 
partitioned, version, used, payload) VALUES 
('wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z',
 'wikipedia', '2013-08-08T21:26:23.799Z', '2013-08-01T00:00:00.000Z', 
'2013-08-02T00:00:00.000Z', '0', '2013-08-08T21:22:48.989Z', '1', 
'{\"dataSource\":\"wikipedia\",\"interval\":\"2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z\",\"version\":\"2013-08-08T21:22:48.989Z\",\"loadSpec\":{\
 [...]
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 4d64c66..4fee910 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -46,6 +46,14 @@
             <artifactId>opencsv</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.airlift</groupId>
+            <artifactId>airline</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-kinesis</artifactId>
             <version>${aws.sdk.version}</version>
@@ -195,7 +203,6 @@
             <groupId>org.apache.druid</groupId>
             <artifactId>druid-services</artifactId>
             <version>${project.parent.version}</version>
-            <scope>runtime</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.druid</groupId>
diff --git a/integration-tests/script/copy_resources.sh 
b/integration-tests/script/copy_resources.sh
index 0324a69..6ecd307 100755
--- a/integration-tests/script/copy_resources.sh
+++ b/integration-tests/script/copy_resources.sh
@@ -32,7 +32,17 @@ rm -rf $SHARED_DIR/docker
 cp -R docker $SHARED_DIR/docker
 mvn -B dependency:copy-dependencies -DoutputDirectory=$SHARED_DIR/docker/lib
 
+# install logging config
+cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
+
+# copy the integration test jar, it provides test-only extension 
implementations
+cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
+
 # move extensions into a seperate extension folder
+# For druid-integration-tests
+mkdir -p $SHARED_DIR/docker/extensions/druid-integration-tests
+# We don't want to copy tests jar.
+cp $SHARED_DIR/docker/lib/druid-integration-tests-*[^s].jar 
$SHARED_DIR/docker/extensions/druid-integration-tests
 # For druid-s3-extensions
 mkdir -p $SHARED_DIR/docker/extensions/druid-s3-extensions
 mv $SHARED_DIR/docker/lib/druid-s3-extensions-* 
$SHARED_DIR/docker/extensions/druid-s3-extensions
@@ -64,12 +74,6 @@ then
   curl 
https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar 
--output $SHARED_DIR/docker/lib/gcs-connector-hadoop2-latest.jar
 fi
 
-# install logging config
-cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml
-
-# copy the integration test jar, it provides test-only extension 
implementations
-cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
-
 # one of the integration tests needs the wikiticker sample data
 mkdir -p $SHARED_DIR/wikiticker-it
 cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz 
$SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
diff --git a/integration-tests/script/docker_run_cluster.sh 
b/integration-tests/script/docker_run_cluster.sh
index 8f3bd92..c5faa4a 100755
--- a/integration-tests/script/docker_run_cluster.sh
+++ b/integration-tests/script/docker_run_cluster.sh
@@ -49,11 +49,16 @@ fi
   then
      if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ]
      then
-      # Start default Druid services and additional druid router 
(custom-check-tls, permissive-tls, no-client-auth-tls)
-      docker-compose -f ${DOCKERDIR}/docker-compose.yml -f 
${DOCKERDIR}/docker-compose.security.yml up -d
+       # Start default Druid services and additional druid router 
(custom-check-tls, permissive-tls, no-client-auth-tls)
+       docker-compose -f ${DOCKERDIR}/docker-compose.yml -f 
${DOCKERDIR}/docker-compose.security.yml up -d
+     elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ]
+     then
+       # Start default Druid services with an additional historical modified 
for query retry test
+       # See CliHistoricalForQueryRetryTest.
+       docker-compose -f ${DOCKERDIR}/docker-compose.query-retry-test.yml up -d
      else
-      # Start default Druid services
-      docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
+       # Start default Druid services
+       docker-compose -f ${DOCKERDIR}/docker-compose.yml up -d
      fi
   else
     # run druid cluster with override config
diff --git 
a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java
 
b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java
new file mode 100644
index 0000000..6663e99
--- /dev/null
+++ 
b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.cli;
+
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import io.airlift.airline.Command;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
+
+import java.util.Properties;
+
+@Command(
+    name = "historical-for-query-retry-test",
+    description = "Runs a Historical node modified for query retry test"
+)
+public class CliHistoricalForQueryRetryTest extends CliHistorical
+{
+  private static final Logger log = new 
Logger(CliHistoricalForQueryRetryTest.class);
+
+  public CliHistoricalForQueryRetryTest()
+  {
+    super();
+  }
+
+  @Inject
+  public void configure(Properties properties)
+  {
+    log.info("Historical is configured for testing query retry on missing 
segments");
+  }
+
+  @Override
+  public void bindQuerySegmentWalker(Binder binder)
+  {
+    
binder.bind(QuerySegmentWalker.class).to(ServerManagerForQueryRetryTest.class).in(LazySingleton.class);
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java 
b/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java
similarity index 76%
copy from core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
copy to 
integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java
index dc314a7..9635c5a 100644
--- a/core/src/main/java/org/apache/druid/cli/CliCommandCreator.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java
@@ -19,13 +19,13 @@
 
 package org.apache.druid.cli;
 
-import io.airlift.airline.Cli;
-import org.apache.druid.guice.annotations.ExtensionPoint;
+import io.airlift.airline.Cli.CliBuilder;
 
-/**
- */
-@ExtensionPoint
-public interface CliCommandCreator
+public class QueryRetryTestCommandCreator implements CliCommandCreator
 {
-  void addCommands(Cli.CliBuilder builder);
+  @Override
+  public void addCommands(CliBuilder builder)
+  {
+    
builder.withGroup("server").withCommands(CliHistoricalForQueryRetryTest.class);
+  }
 }
diff --git 
a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java
 
b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java
new file mode 100644
index 0000000..b81b165
--- /dev/null
+++ 
b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.guice.annotations.Processing;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.ReferenceCountingSegment;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.server.SegmentManager;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+/**
+ * This server manager is designed to test query retry on missing segments. A 
segment can be missing during a query
+ * if a historical drops the segment after the broker issues the query to the 
historical. To mimic this situation,
+ * the historical with this server manager announces all segments assigned, 
but reports missing segments for the
+ * first 3 segments specified in the query.
+ *
+ * @see org.apache.druid.query.RetryQueryRunner
+ */
+public class ServerManagerForQueryRetryTest extends ServerManager
+{
+  // Query context key that indicates this query is for query retry testing.
+  public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test";
+
+  private static final Logger LOG = new 
Logger(ServerManagerForQueryRetryTest.class);
+  private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3;
+
+  private final ConcurrentHashMap<String, Set<SegmentDescriptor>> 
queryToIgnoredSegments = new ConcurrentHashMap<>();
+
+  @Inject
+  public ServerManagerForQueryRetryTest(
+      QueryRunnerFactoryConglomerate conglomerate,
+      ServiceEmitter emitter,
+      @Processing ExecutorService exec,
+      CachePopulator cachePopulator,
+      @Smile ObjectMapper objectMapper,
+      Cache cache,
+      CacheConfig cacheConfig,
+      SegmentManager segmentManager,
+      JoinableFactory joinableFactory,
+      ServerConfig serverConfig
+  )
+  {
+    super(
+        conglomerate,
+        emitter,
+        exec,
+        cachePopulator,
+        objectMapper,
+        cache,
+        cacheConfig,
+        segmentManager,
+        joinableFactory,
+        serverConfig
+    );
+  }
+
+  @Override
+  <T> QueryRunner<T> buildQueryRunnerForSegment(
+      Query<T> query,
+      SegmentDescriptor descriptor,
+      QueryRunnerFactory<T, Query<T>> factory,
+      QueryToolChest<T, Query<T>> toolChest,
+      VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
+      Function<SegmentReference, SegmentReference> segmentMapFn,
+      AtomicLong cpuTimeAccumulator
+  )
+  {
+    if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) {
+      final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
+      queryToIgnoredSegments.compute(
+          query.getMostSpecificId(),
+          (queryId, ignoredSegments) -> {
+            if (ignoredSegments == null) {
+              ignoredSegments = new HashSet<>();
+            }
+            if (ignoredSegments.size() < 
MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
+              ignoredSegments.add(descriptor);
+              isIgnoreSegment.setTrue();
+            }
+            return ignoredSegments;
+          }
+      );
+
+      if (isIgnoreSegment.isTrue()) {
+        LOG.info("Pretending I don't have segment[%s]", descriptor);
+        return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+      }
+    }
+    return super.buildQueryRunnerForSegment(
+        query,
+        descriptor,
+        factory,
+        toolChest,
+        timeline,
+        segmentMapFn,
+        cpuTimeAccumulator
+    );
+  }
+}
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
index 9f39bd5..13476bd 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/QueryWithResults.java
@@ -20,6 +20,7 @@
 package org.apache.druid.testing.utils;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.query.Query;
 
 import java.util.List;
@@ -27,14 +28,12 @@ import java.util.Map;
 
 public class QueryWithResults extends AbstractQueryWithResults<Query>
 {
-
   @JsonCreator
   public QueryWithResults(
-      Query query,
-      List<Map<String, Object>> expectedResults
+      @JsonProperty("query") Query query,
+      @JsonProperty("expectedResults") List<Map<String, Object>> 
expectedResults
   )
   {
     super(query, expectedResults);
   }
-
 }
diff --git a/integration-tests/stop_cluster.sh 
b/integration-tests/src/main/resources/META-INF/services/org.apache.druid.cli.CliCommandCreator
old mode 100755
new mode 100644
similarity index 58%
copy from integration-tests/stop_cluster.sh
copy to 
integration-tests/src/main/resources/META-INF/services/org.apache.druid.cli.CliCommandCreator
index d75e73f..70c37f6
--- a/integration-tests/stop_cluster.sh
+++ 
b/integration-tests/src/main/resources/META-INF/services/org.apache.druid.cli.CliCommandCreator
@@ -1,4 +1,3 @@
-#!/usr/bin/env bash
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -14,16 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# Skip stopping docker if flag set (For use during development)
-if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [ 
"$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" == true ]
-  then
-    exit 0
-  fi
-
-for node in druid-historical druid-coordinator druid-overlord druid-router 
druid-router-permissive-tls druid-router-no-client-auth-tls 
druid-router-custom-check-tls druid-broker druid-middlemanager 
druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
-do
-  docker stop $node
-  docker rm $node
-done
-
-docker network rm druid-it-net
+org.apache.druid.cli.QueryRetryTestCommandCreator
\ No newline at end of file
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java 
b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index d759ff2..01d1e91 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -52,6 +52,8 @@ public class TestNGGroup
    */
   public static final String QUERY = "query";
 
+  public static final String QUERY_RETRY = "query-retry";
+
   public static final String REALTIME_INDEX = "realtime-index";
 
   /**
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
new file mode 100644
index 0000000..817ee0b
--- /dev/null
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.QueryResourceTestClient;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.QueryResultVerifier;
+import org.apache.druid.testing.utils.QueryWithResults;
+import org.apache.druid.testing.utils.TestQueryHelper;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractIndexerTest;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class tests the query retry on missing segments. A segment can be 
missing in a historical during a query if
+ * the historical drops the segment after the broker issues the query to the 
historical. To mimic this case, this
+ * test spawns two historicals, a normal historical and a historical modified 
for testing. The later historical
+ * announces all segments assigned, but doesn't serve all of them. Instead, it 
can report missing segments for some
+ * segments. See {@link ServerManagerForQueryRetryTest} for more details.
+ *
+ * To run this test properly, the test group must be specified as {@link 
TestNGGroup#QUERY_RETRY}.
+ */
+@Test(groups = TestNGGroup.QUERY_RETRY)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITQueryRetryTestOnMissingSegments
+{
+  private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
+  private static final String QUERIES_RESOURCE = 
"/queries/wikipedia_editstream_queries_query_retry_test.json";
+  private static final int TIMES_TO_RUN = 50;
+
+  /**
+   * This test runs the same query multiple times. This enumeration represents 
an expectation after finishing
+   * running the query.
+   */
+  private enum Expectation
+  {
+    /**
+     * Expect that all runs succeed.
+     */
+    ALL_SUCCESS,
+    /**
+     * Expect that all runs returns the 200 HTTP response, but some of them 
can return incorrect result.
+     */
+    INCORRECT_RESULT,
+    /**
+     * Expect that some runs can return the 500 HTTP response. For the runs 
returned the 200 HTTP response, the query
+     * result must be correct.
+     */
+    QUERY_FAILURE
+  }
+
+  @Inject
+  private CoordinatorResourceTestClient coordinatorClient;
+  @Inject
+  private TestQueryHelper queryHelper;
+  @Inject
+  private QueryResourceTestClient queryClient;
+  @Inject
+  private IntegrationTestingConfig config;
+  @Inject
+  private ObjectMapper jsonMapper;
+
+  @BeforeMethod
+  public void before()
+  {
+    // ensure that wikipedia segments are loaded completely
+    ITRetryUtil.retryUntilTrue(
+        () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), 
"wikipedia segment load"
+    );
+  }
+
+  @Test
+  public void testWithRetriesDisabledPartialResultDisallowed() throws Exception
+  {
+    // Since retry is disabled and partial result is not allowed, we can 
expect some queries can fail.
+    // If a query succeed, its result must be correct.
+    testQueries(buildQuery(0, false), Expectation.QUERY_FAILURE);
+  }
+
+  @Test
+  public void testWithRetriesDisabledPartialResultAllowed() throws Exception
+  {
+    // Since retry is disabled but partial result is allowed, all queries must 
succeed.
+    // However, some queries can return incorrect result.
+    testQueries(buildQuery(0, true), Expectation.INCORRECT_RESULT);
+  }
+
+  @Test
+  public void testWithRetriesEnabledPartialResultDisallowed() throws Exception
+  {
+    // Since retry is enabled, all queries must succeed even though partial 
result is disallowed.
+    // All queries must return correct result.
+    testQueries(buildQuery(30, false), Expectation.ALL_SUCCESS);
+  }
+
+  private void testQueries(String queryWithResultsStr, Expectation 
expectation) throws Exception
+  {
+    final List<QueryWithResults> queries = jsonMapper.readValue(
+        queryWithResultsStr,
+        new TypeReference<List<QueryWithResults>>() {}
+    );
+    testQueries(queries, expectation);
+  }
+
+  private void testQueries(List<QueryWithResults> queries, Expectation 
expectation) throws Exception
+  {
+    int querySuccess = 0;
+    int queryFailure = 0;
+    int resultMatches = 0;
+    int resultMismatches = 0;
+    for (int i = 0; i < TIMES_TO_RUN; i++) {
+      for (QueryWithResults queryWithResult : queries) {
+        final StatusResponseHolder responseHolder = queryClient
+            .queryAsync(queryHelper.getQueryURL(config.getBrokerUrl()), 
queryWithResult.getQuery())
+            .get();
+
+        if (responseHolder.getStatus().getCode() == 
HttpResponseStatus.OK.getCode()) {
+          querySuccess++;
+
+          List<Map<String, Object>> result = jsonMapper.readValue(
+              responseHolder.getContent(),
+              new TypeReference<List<Map<String, Object>>>() {}
+          );
+          if (!QueryResultVerifier.compareResults(result, 
queryWithResult.getExpectedResults())) {
+            if (expectation != Expectation.INCORRECT_RESULT) {
+              throw new ISE(
+                  "Incorrect query results for query %s \n expectedResults: %s 
\n actualResults : %s",
+                  queryWithResult.getQuery(),
+                  
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
+                  jsonMapper.writeValueAsString(result)
+              );
+            } else {
+              resultMismatches++;
+            }
+          } else {
+            resultMatches++;
+          }
+        } else if (responseHolder.getStatus().getCode() == 
HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode() &&
+                   expectation == Expectation.QUERY_FAILURE) {
+          final Map<String, Object> response = 
jsonMapper.readValue(responseHolder.getContent(), Map.class);
+          final String errorMessage = (String) response.get("errorMessage");
+          Assert.assertNotNull(errorMessage, "errorMessage");
+          Assert.assertTrue(errorMessage.contains("No results found for 
segments"));
+          queryFailure++;
+        } else {
+          throw new ISE(
+              "Unexpected failure, code: [%s], content: [%s]",
+              responseHolder.getStatus(),
+              responseHolder.getContent()
+          );
+        }
+      }
+    }
+
+    switch (expectation) {
+      case ALL_SUCCESS:
+        Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, 
querySuccess);
+        Assert.assertEquals(0, queryFailure);
+        Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, 
resultMatches);
+        Assert.assertEquals(0, resultMismatches);
+        break;
+      case QUERY_FAILURE:
+        Assert.assertTrue(querySuccess > 0, "At least one query is expected to 
succeed.");
+        Assert.assertTrue(queryFailure > 0, "At least one query is expected to 
fail.");
+        Assert.assertEquals(querySuccess, resultMatches);
+        Assert.assertEquals(0, resultMismatches);
+        break;
+      case INCORRECT_RESULT:
+        Assert.assertEquals(ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN, 
querySuccess);
+        Assert.assertEquals(0, queryFailure);
+        Assert.assertTrue(resultMatches > 0, "At least one query is expected 
to return correct results.");
+        Assert.assertTrue(resultMismatches > 0, "At least one query is 
expected to return less results.");
+        break;
+      default:
+        throw new ISE("Unknown expectation[%s]", expectation);
+    }
+  }
+
+  private String buildQuery(int numRetriesOnMissingSegments, boolean 
allowPartialResults) throws IOException
+  {
+    return StringUtils.replace(
+        AbstractIndexerTest.getResourceAsString(QUERIES_RESOURCE),
+        "%%CONTEXT%%",
+        
jsonMapper.writeValueAsString(buildContext(numRetriesOnMissingSegments, 
allowPartialResults))
+    );
+  }
+
+  private static Map<String, Object> buildContext(int 
numRetriesOnMissingSegments, boolean allowPartialResults)
+  {
+    final Map<String, Object> context = new HashMap<>();
+    // Disable cache so that each run hits historical.
+    context.put(QueryContexts.USE_CACHE_KEY, false);
+    context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 
numRetriesOnMissingSegments);
+    context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
+    context.put(ServerManagerForQueryRetryTest.QUERY_RETRY_TEST_CONTEXT_KEY, 
true);
+    return context;
+  }
+}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
index 389d33e..858b72d 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
@@ -64,7 +64,6 @@ public class ITWikipediaQueryTest
   @BeforeMethod
   public void before() throws Exception
   {
-
     // ensure that wikipedia segments are loaded completely
     ITRetryUtil.retryUntilTrue(
         () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), 
"wikipedia segment load"
diff --git 
a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
 
b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
new file mode 100644
index 0000000..0886cf4
--- /dev/null
+++ 
b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries_query_retry_test.json
@@ -0,0 +1,26 @@
+[
+    {
+        "description": "timeseries, 1 agg, all",
+        "query": {
+            "queryType": "timeseries",
+            "dataSource": "wikipedia_editstream",
+            "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
+            "granularity": "all",
+            "aggregations": [
+                {
+                    "type": "count",
+                    "name": "rows"
+                }
+            ],
+            "context": %%CONTEXT%%
+        },
+        "expectedResults": [
+            {
+                "timestamp": "2013-01-01T00:00:00.000Z",
+                "result": {
+                    "rows": 2390950
+                }
+            }
+        ]
+    }
+]
diff --git a/integration-tests/stop_cluster.sh 
b/integration-tests/stop_cluster.sh
index d75e73f..92f9c41 100755
--- a/integration-tests/stop_cluster.sh
+++ b/integration-tests/stop_cluster.sh
@@ -20,7 +20,7 @@ if [ -n "$DRUID_INTEGRATION_TEST_SKIP_RUN_DOCKER" ] && [ 
"$DRUID_INTEGRATION_TES
     exit 0
   fi
 
-for node in druid-historical druid-coordinator druid-overlord druid-router 
druid-router-permissive-tls druid-router-no-client-auth-tls 
druid-router-custom-check-tls druid-broker druid-middlemanager 
druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
+for node in druid-historical druid-historical-for-query-retry-test 
druid-coordinator druid-overlord druid-router druid-router-permissive-tls 
druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker 
druid-middlemanager druid-zookeeper-kafka druid-metadata-storage 
druid-it-hadoop;
 do
   docker stop $node
   docker rm $node
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java 
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 3667593..b70e8cf 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -53,6 +53,9 @@ public class QueryContexts
   public static final String 
JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS_ENABLE_KEY = 
"enableJoinFilterRewriteValueColumnFilters";
   public static final String JOIN_FILTER_REWRITE_MAX_SIZE_KEY = 
"joinFilterRewriteMaxSize";
   public static final String USE_FILTER_CNF_KEY = "useFilterCNF";
+  public static final String NUM_RETRIES_ON_MISSING_SEGMENTS_KEY = 
"numRetriesOnMissingSegments";
+  public static final String RETURN_PARTIAL_RESULTS_KEY = 
"returnPartialResults";
+  public static final String USE_CACHE_KEY = "useCache";
 
   public static final boolean DEFAULT_BY_SEGMENT = false;
   public static final boolean DEFAULT_POPULATE_CACHE = true;
@@ -143,7 +146,7 @@ public class QueryContexts
 
   public static <T> boolean isUseCache(Query<T> query, boolean defaultValue)
   {
-    return parseBoolean(query, "useCache", defaultValue);
+    return parseBoolean(query, USE_CACHE_KEY, defaultValue);
   }
 
   public static <T> boolean isPopulateResultLevelCache(Query<T> query)
@@ -344,6 +347,16 @@ public class QueryContexts
     return defaultTimeout;
   }
 
+  public static <T> int getNumRetriesOnMissingSegments(Query<T> query, int 
defaultValue)
+  {
+    return query.getContextValue(NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 
defaultValue);
+  }
+
+  public static <T> boolean allowReturnPartialResults(Query<T> query, boolean 
defaultValue)
+  {
+    return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
+  }
+
   static <T> long parseLong(Query<T> query, String key, long defaultValue)
   {
     final Object val = query.getContextValue(key);
diff --git 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
index 2718282..9b4c94b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
+++ 
b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.CacheStrategy;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryToolChest;
@@ -138,8 +139,17 @@ public class TimeseriesQueryQueryToolChest extends 
QueryToolChest<Result<Timeser
 
       final Sequence<Result<TimeseriesResultValue>> finalSequence;
 
-      if (query.getGranularity().equals(Granularities.ALL) && 
!query.isSkipEmptyBuckets()) {
-        //Usally it is NOT Okay to materialize results via toList(), but 
Granularity is ALL thus we have only one record
+      // When granularity = ALL, there is no grouping key for this query.
+      // To be more sql-compliant, we should return something (e.g., 0 for 
count queries) even when
+      // the sequence is empty.
+      if (query.getGranularity().equals(Granularities.ALL) &&
+          // Returns empty sequence if this query allows skipping empty buckets
+          !query.isSkipEmptyBuckets() &&
+          // Returns empty sequence if bySegment is set because bySegment 
results are mostly used for
+          // caching in historicals or debugging where the exact results are 
preferred.
+          !QueryContexts.isBySegment(query)) {
+        // Usally it is NOT Okay to materialize results via toList(), but 
Granularity is ALL thus
+        // we have only one record.
         final List<Result<TimeseriesResultValue>> val = baseResults.toList();
         finalSequence = val.isEmpty() ? 
Sequences.simple(Collections.singletonList(
             getNullTimeseriesResultValue(query))) : Sequences.simple(val);
diff --git a/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java 
b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
index f18764f..8823c07 100644
--- a/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
+++ b/server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
@@ -214,10 +214,14 @@ public class RetryQueryRunner<T> implements QueryRunner<T>
         return true;
       } else {
         final List<SegmentDescriptor> missingSegments = 
getMissingSegments(queryPlus, context);
+        final int maxNumRetries = QueryContexts.getNumRetriesOnMissingSegments(
+            queryPlus.getQuery(),
+            config.getNumTries()
+        );
         if (missingSegments.isEmpty()) {
           return false;
-        } else if (retryCount >= config.getNumTries()) {
-          if (!config.isReturnPartialResults()) {
+        } else if (retryCount >= maxNumRetries) {
+          if (!QueryContexts.allowReturnPartialResults(queryPlus.getQuery(), 
config.isReturnPartialResults())) {
             throw new SegmentMissingException("No results found for 
segments[%s]", missingSegments);
           } else {
             return false;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java 
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index 79c7d94..222a0dd 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -204,35 +204,20 @@ public class ServerManager implements QuerySegmentWalker
         analysis.getBaseQuery().orElse(query)
     );
 
-    FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
+    final FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
         .create(specs)
         .transformCat(
-            descriptor -> {
-              final PartitionHolder<ReferenceCountingSegment> entry = 
timeline.findEntry(
-                  descriptor.getInterval(),
-                  descriptor.getVersion()
-              );
-
-              if (entry == null) {
-                return Collections.singletonList(new 
ReportTimelineMissingSegmentQueryRunner<>(descriptor));
-              }
-
-              final PartitionChunk<ReferenceCountingSegment> chunk = 
entry.getChunk(descriptor.getPartitionNumber());
-              if (chunk == null) {
-                return Collections.singletonList(new 
ReportTimelineMissingSegmentQueryRunner<>(descriptor));
-              }
-
-              final ReferenceCountingSegment segment = chunk.getObject();
-              return Collections.singletonList(
-                  buildAndDecorateQueryRunner(
-                      factory,
-                      toolChest,
-                      segmentMapFn.apply(segment),
-                      descriptor,
-                      cpuTimeAccumulator
-                  )
-              );
-            }
+            descriptor -> Collections.singletonList(
+                buildQueryRunnerForSegment(
+                    query,
+                    descriptor,
+                    factory,
+                    toolChest,
+                    timeline,
+                    segmentMapFn,
+                    cpuTimeAccumulator
+                )
+            )
         );
 
     return CPUTimeMetricQueryRunner.safeBuild(
@@ -247,6 +232,40 @@ public class ServerManager implements QuerySegmentWalker
     );
   }
 
+  <T> QueryRunner<T> buildQueryRunnerForSegment(
+      final Query<T> query,
+      final SegmentDescriptor descriptor,
+      final QueryRunnerFactory<T, Query<T>> factory,
+      final QueryToolChest<T, Query<T>> toolChest,
+      final VersionedIntervalTimeline<String, ReferenceCountingSegment> 
timeline,
+      final Function<SegmentReference, SegmentReference> segmentMapFn,
+      final AtomicLong cpuTimeAccumulator
+  )
+  {
+    final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
+        descriptor.getInterval(),
+        descriptor.getVersion()
+    );
+
+    if (entry == null) {
+      return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+    }
+
+    final PartitionChunk<ReferenceCountingSegment> chunk = 
entry.getChunk(descriptor.getPartitionNumber());
+    if (chunk == null) {
+      return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
+    }
+
+    final ReferenceCountingSegment segment = chunk.getObject();
+    return buildAndDecorateQueryRunner(
+        factory,
+        toolChest,
+        segmentMapFn.apply(segment),
+        descriptor,
+        cpuTimeAccumulator
+    );
+  }
+
   private <T> QueryRunner<T> buildAndDecorateQueryRunner(
       final QueryRunnerFactory<T, Query<T>> factory,
       final QueryToolChest<T, Query<T>> toolChest,
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java 
b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index d2c15c1..d0c457a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.cli;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
 import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
@@ -91,7 +93,7 @@ public class CliHistorical extends ServerRunnable
           binder.bind(ServerManager.class).in(LazySingleton.class);
           binder.bind(SegmentManager.class).in(LazySingleton.class);
           binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
-          
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
+          bindQuerySegmentWalker(binder);
 
           binder.bind(ServerTypeConfig.class).toInstance(new 
ServerTypeConfig(ServerType.HISTORICAL));
           
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
@@ -119,4 +121,13 @@ public class CliHistorical extends ServerRunnable
         new LookupModule()
     );
   }
+
+  /**
+   * This method is visible for testing query retry on missing segments. See 
{@link CliHistoricalForQueryRetryTest}.
+   */
+  @VisibleForTesting
+  public void bindQuerySegmentWalker(Binder binder)
+  {
+    
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to