gianm commented on a change in pull request #10082:
URL: https://github.com/apache/druid/pull/10082#discussion_r446335802



##########
File path: server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
+import org.apache.druid.java.util.common.guava.MergeSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.context.ResponseContext.Key;
+import org.apache.druid.segment.SegmentMissingException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.BiFunction;
+
+public class RetryQueryRunner<T> implements QueryRunner<T>
+{
+  private static final Logger LOG = new Logger(RetryQueryRunner.class);
+
+  private final QueryRunner<T> baseRunner;
+  private final BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> 
retryRunnerCreateFn;
+  private final RetryQueryRunnerConfig config;
+  private final ObjectMapper jsonMapper;
+
+  private int numTotalRetries;
+
+  public RetryQueryRunner(
+      QueryRunner<T> baseRunner,
+      BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> 
retryRunnerCreateFn,
+      RetryQueryRunnerConfig config,
+      ObjectMapper jsonMapper
+  )
+  {
+    this.baseRunner = baseRunner;
+    this.retryRunnerCreateFn = retryRunnerCreateFn;
+    this.config = config;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @VisibleForTesting
+  int getNumTotalRetries()

Review comment:
       Should be `getTotalNumRetries()`. "Num total retries" makes it sound 
like the number of times we did a total retry.

##########
File path: 
processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -112,6 +112,16 @@
         "uncoveredIntervalsOverflowed",
             (oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
     ),
+    /**
+     * Expected remaining number of responses from query nodes.
+     * The value is initialized in {@code CachingClusteredClient} when it 
initializes the connection to the query nodes,
+     * and is updated whenever they respond (@code DirectDruidClient). {@code 
RetryQueryRunner} uses this value to
+     * check if the {@link #MISSING_SEGMENTS} is valid.
+     */
+    REMAINING_RESPONSES_FROM_QUERY_NODES(

Review comment:
       This should be added to 
`DirectDruidClient.removeMagicResponseContextFields` or something similar, so 
it doesn't end up in the response to the user.

##########
File path: server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.CachingClusteredClient;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.SimpleServerView;
+import org.apache.druid.client.TestHttpClient;
+import org.apache.druid.client.TestHttpClient.SimpleServerManager;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SegmentMissingException;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class RetryQueryRunnerTest

Review comment:
       A general comment on testing: we should have a real integration test for 
this. It is the kind of thing that is tough to get right in unit tests, as 
evidence by the fact that we already had an existing RetryQueryRunnerTest, but 
it isn't testing the right thing.
   
   I reviewed the unit test and it seems reasonable enough (with some 
comments), but for this particular functionality we need an integration test 
that uses the real servers and a real network.

##########
File path: 
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
##########
@@ -618,6 +619,7 @@ private void addSequencesFromServer(
         final SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer
     )
     {
+      responseContext.put(Key.REMAINING_RESPONSES_FROM_QUERY_NODES, 
segmentsByServer.size());

Review comment:
       I'm not sure this will work properly if there are multiple CCC queries 
associated with a given Druid query. This can happen with union queries and 
with subqueries. I believe they share response contexts, and this `put` would 
potentially cause things to get clobbered. Could you please look into this?
   
   Btw, the reason I believe they share contexts is that I only currently see 
concurrent contexts getting created in QueryLifecycle (which can wrap multiple 
CCC queries).
   
   If it is a real issue, we might be able to address it by splitting out the 
number of servers remaining by subquery ID. However, in that case, we need to 
make sure subquery ID is set for union datasources. I don't think it is 
currently.

##########
File path: server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.CachingClusteredClient;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.SimpleServerView;
+import org.apache.druid.client.TestHttpClient;
+import org.apache.druid.client.TestHttpClient.SimpleServerManager;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SegmentMissingException;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class RetryQueryRunnerTest
+{
+  private static final Closer CLOSER = Closer.create();
+  private static final String DATASOURCE = "datasource";
+  private static final GeneratorSchemaInfo SCHEMA_INFO = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+  private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final QueryToolChestWarehouse toolChestWarehouse;
+  private final QueryRunnerFactoryConglomerate conglomerate;
+
+  private SegmentGenerator segmentGenerator;
+  private TestHttpClient httpClient;
+  private SimpleServerView simpleServerView;
+  private CachingClusteredClient cachingClusteredClient;
+  private List<DruidServer> servers;
+
+  public RetryQueryRunnerTest()
+  {
+    conglomerate = 
QueryStackTests.createQueryRunnerFactoryConglomerate(CLOSER, 
USE_PARALLEL_MERGE_POOL_CONFIGURED);
+
+    toolChestWarehouse = new QueryToolChestWarehouse()
+    {
+      @Override
+      public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> 
getToolChest(final QueryType query)
+      {
+        return conglomerate.findFactory(query).getToolchest();
+      }
+    };
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException
+  {
+    CLOSER.close();
+  }
+
+  @Before
+  public void setup()
+  {
+    segmentGenerator = new SegmentGenerator();
+    httpClient = new TestHttpClient(objectMapper);
+    simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, 
httpClient);
+    cachingClusteredClient = new CachingClusteredClient(
+        toolChestWarehouse,
+        simpleServerView,
+        MapCache.create(0),
+        objectMapper,
+        new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 
0),
+        new CacheConfig(),
+        new DruidHttpClientConfig(),
+        
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
+        ForkJoinPool.commonPool(),
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
+    );
+    servers = new ArrayList<>();
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    segmentGenerator.close();
+  }
+
+  private void addServer(DruidServer server, DataSegment dataSegment, 
QueryableIndex queryableIndex)
+  {
+    servers.add(server);
+    simpleServerView.addServer(server, dataSegment);
+    httpClient.addServerAndRunner(server, new 
SimpleServerManager(conglomerate, dataSegment.getId(), queryableIndex));
+  }
+
+  @Test
+  public void testNoRetry()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, false),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(0, queryRunner.getNumTotalRetries());
+    Assert.assertFalse(queryResult.isEmpty());
+    Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
+  }
+
+  @Test
+  public void testRetryForMovedSegment()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, true),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+
+    // Let's move a segment
+    dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
+
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(1, queryRunner.getNumTotalRetries());

Review comment:
       Is there a guarantee that there will be 1 retry? Why won't the the 
correct, new server for the segment be selected the first time?

##########
File path: server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
+import org.apache.druid.java.util.common.guava.MergeSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.context.ResponseContext.Key;
+import org.apache.druid.segment.SegmentMissingException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.BiFunction;
+
+public class RetryQueryRunner<T> implements QueryRunner<T>
+{
+  private static final Logger LOG = new Logger(RetryQueryRunner.class);
+
+  private final QueryRunner<T> baseRunner;
+  private final BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> 
retryRunnerCreateFn;
+  private final RetryQueryRunnerConfig config;
+  private final ObjectMapper jsonMapper;
+
+  private int numTotalRetries;
+
+  public RetryQueryRunner(
+      QueryRunner<T> baseRunner,
+      BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> 
retryRunnerCreateFn,
+      RetryQueryRunnerConfig config,
+      ObjectMapper jsonMapper
+  )
+  {
+    this.baseRunner = baseRunner;
+    this.retryRunnerCreateFn = retryRunnerCreateFn;
+    this.config = config;
+    this.jsonMapper = jsonMapper;
+  }
+
+  @VisibleForTesting
+  int getNumTotalRetries()
+  {
+    return numTotalRetries;
+  }
+
+  @Override
+  public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext 
context)
+  {
+    final Sequence<T> baseSequence = baseRunner.run(queryPlus, context);
+    return new YieldingSequenceBase<T>()
+    {
+      @Override
+      public <OutType> Yielder<OutType> toYielder(OutType initValue, 
YieldingAccumulator<OutType, T> accumulator)
+      {
+        final Sequence<Sequence<T>> retryingSequence = new BaseSequence<>(
+            new IteratorMaker<Sequence<T>, RetryingSequenceIterator>()
+            {
+              @Override
+              public RetryingSequenceIterator make()
+              {
+                return new RetryingSequenceIterator(queryPlus, context, 
baseSequence);
+              }
+
+              @Override
+              public void cleanup(RetryingSequenceIterator iterFromMake)
+              {
+                numTotalRetries = iterFromMake.retryCount;
+              }
+            }
+        );
+        return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(), 
retryingSequence)
+            .toYielder(initValue, accumulator);
+      }
+    };
+  }
+
+  private List<SegmentDescriptor> getMissingSegments(final ResponseContext 
context)
+  {
+    // Sanity check before retrieving missingSegments from responseContext.
+    // The missingSegments in the responseContext is only valid when all 
servers have responded to the broker.
+    // The remainingResponses must be not null but 0 in the responseContext at 
this point.
+    final int remainingResponses = Preconditions.checkNotNull(
+        (Integer) context.get(Key.REMAINING_RESPONSES_FROM_QUERY_NODES),
+        "%s in responseContext",
+        Key.REMAINING_RESPONSES_FROM_QUERY_NODES.getName()
+    );
+    if (remainingResponses > 0) {
+      throw new ISE("Failed to check missing segments due to missing responds 
from [%d] servers", remainingResponses);
+    }
+
+    final Object maybeMissingSegments = 
context.get(ResponseContext.Key.MISSING_SEGMENTS);
+    if (maybeMissingSegments == null) {
+      return Collections.emptyList();
+    }
+
+    return jsonMapper.convertValue(
+        maybeMissingSegments,
+        new TypeReference<List<SegmentDescriptor>>()
+        {
+        }
+    );
+  }
+
+  /**
+   * A lazy iterator populating {@link Sequence} by retrying the query. The 
first returned sequence is always the base
+   * sequence given in the constructor. Subsequent sequences are created 
dynamically whenever it retries the query. All
+   * the sequences populated by this iterator will be merged (not combined) 
with the base sequence.
+   *
+   * The design of this iterator depends on how {@link MergeSequence} works; 
the MergeSequence pops an item from
+   * each underlying sequence and pushes them to a {@link 
java.util.PriorityQueue}. Whenever it pops from the queue,
+   * it pushes a new item from the sequence where the returned item was 
originally from. Since the first returned
+   * sequence from this iterator is always the base sequence, the 
MergeSequence will call {@link Sequence#toYielder}
+   * on the base sequence first which in turn initializing query distribution 
tree. Once this tree is built, the query
+   * nodes (historicals and realtime tasks) will lock all segments to read and 
report missing segments to the broker.
+   * If there are missing segments reported, this iterator will rewrite the 
query with those reported segments and
+   * reissue the rewritten query.
+   *
+   * @see org.apache.druid.client.CachingClusteredClient
+   * @see org.apache.druid.client.DirectDruidClient
+   */
+  private class RetryingSequenceIterator implements Iterator<Sequence<T>>
+  {
+    private final QueryPlus<T> queryPlus;
+    private final ResponseContext context;
+    private Sequence<T> sequence;
+    private int retryCount = 0;
+
+    private RetryingSequenceIterator(QueryPlus<T> queryPlus, ResponseContext 
context, Sequence<T> baseSequence)
+    {
+      this.queryPlus = queryPlus;
+      this.context = context;
+      this.sequence = baseSequence;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (sequence != null) {
+        return true;
+      } else {
+        final List<SegmentDescriptor> missingSegments = 
getMissingSegments(context);
+        if (missingSegments.isEmpty()) {
+          return false;
+        } else if (retryCount >= config.getNumTries()) {
+          if (!config.isReturnPartialResults()) {
+            throw new SegmentMissingException("No results found for 
segments[%s]", missingSegments);
+          } else {
+            return false;
+          }
+        } else {
+          LOG.info("[%,d] missing segments found. Retry attempt [%,d]", 
missingSegments.size(), retryCount++);

Review comment:
       Should be `++retryCount`, right? (You would want the first retry to say 
"Retry attempt [1]".)
   
   However, prefix and postfix incrementing in log messages can make things 
harder to read, so IMO it'd be better to split this up into two separate lines.

##########
File path: 
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -766,7 +766,6 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
                    .withDruidCluster(cluster)
                    .withLoadManagementPeons(loadManagementPeons)
                    .withSegmentReplicantLookup(segmentReplicantLookup)
-                   .withBalancerReferenceTimestamp(DateTimes.nowUtc())

Review comment:
       Is this change related?

##########
File path: server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.CachingClusteredClient;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.SimpleServerView;
+import org.apache.druid.client.TestHttpClient;
+import org.apache.druid.client.TestHttpClient.SimpleServerManager;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SegmentMissingException;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class RetryQueryRunnerTest
+{
+  private static final Closer CLOSER = Closer.create();
+  private static final String DATASOURCE = "datasource";
+  private static final GeneratorSchemaInfo SCHEMA_INFO = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+  private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final QueryToolChestWarehouse toolChestWarehouse;
+  private final QueryRunnerFactoryConglomerate conglomerate;
+
+  private SegmentGenerator segmentGenerator;
+  private TestHttpClient httpClient;
+  private SimpleServerView simpleServerView;
+  private CachingClusteredClient cachingClusteredClient;
+  private List<DruidServer> servers;
+
+  public RetryQueryRunnerTest()
+  {
+    conglomerate = 
QueryStackTests.createQueryRunnerFactoryConglomerate(CLOSER, 
USE_PARALLEL_MERGE_POOL_CONFIGURED);
+
+    toolChestWarehouse = new QueryToolChestWarehouse()
+    {
+      @Override
+      public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> 
getToolChest(final QueryType query)
+      {
+        return conglomerate.findFactory(query).getToolchest();
+      }
+    };
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException
+  {
+    CLOSER.close();
+  }
+
+  @Before
+  public void setup()
+  {
+    segmentGenerator = new SegmentGenerator();
+    httpClient = new TestHttpClient(objectMapper);
+    simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, 
httpClient);
+    cachingClusteredClient = new CachingClusteredClient(
+        toolChestWarehouse,
+        simpleServerView,
+        MapCache.create(0),
+        objectMapper,
+        new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 
0),
+        new CacheConfig(),
+        new DruidHttpClientConfig(),
+        
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
+        ForkJoinPool.commonPool(),
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
+    );
+    servers = new ArrayList<>();
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    segmentGenerator.close();
+  }
+
+  private void addServer(DruidServer server, DataSegment dataSegment, 
QueryableIndex queryableIndex)
+  {
+    servers.add(server);
+    simpleServerView.addServer(server, dataSegment);
+    httpClient.addServerAndRunner(server, new 
SimpleServerManager(conglomerate, dataSegment.getId(), queryableIndex));
+  }
+
+  @Test
+  public void testNoRetry()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, false),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(0, queryRunner.getNumTotalRetries());
+    Assert.assertFalse(queryResult.isEmpty());
+    Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
+  }
+
+  @Test
+  public void testRetryForMovedSegment()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, true),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+
+    // Let's move a segment
+    dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
+
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(1, queryRunner.getNumTotalRetries());
+    // Note that we dropped a segment from a server, but it's still announced 
in the server view.
+    // As a result, we may get the full result or not depending on what server 
will get the retry query.
+    // If we hit the same server, the query will return incomplete result.
+    Assert.assertTrue(queryResult.size() > 8);
+    Assert.assertEquals(expectedTimeseriesResult(queryResult.size()), 
queryResult);
+  }
+
+  @Test
+  public void testRetryUntilWeGetFullResult()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(100, false), // retry up to 100
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+
+    // Let's move a segment
+    dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
+
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertTrue(0 < queryRunner.getNumTotalRetries());
+    Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
+  }
+
+  @Test
+  public void testFailWithPartialResultsAfterRetry()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, false),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+    dropSegmentFromServer(servers.get(0));
+
+    expectedException.expect(SegmentMissingException.class);
+    expectedException.expectMessage("No results found for segments");
+    try {
+      sequence.toList();
+    }
+    finally {
+      Assert.assertEquals(1, queryRunner.getNumTotalRetries());
+    }
+  }
+
+  private void prepareCluster(int numServers)
+  {
+    for (int i = 0; i < numServers; i++) {
+      final DataSegment segment = newSegment(SCHEMA_INFO.getDataInterval(), i);
+      addServer(
+          SimpleServerView.createServer(i + 1),
+          segment,
+          segmentGenerator.generate(segment, SCHEMA_INFO, Granularities.NONE, 
10)
+      );
+    }
+  }
+
+  private Pair<SegmentId, QueryableIndex> dropSegmentFromServer(DruidServer 
fromServer)

Review comment:
       Could you add some javadocs to these methods explaining what they do?
   
   It's non-obvious that `dropSegmentFromServer` doesn't modify the server 
view, and that `dropSegmentFromServerAndAddNewServerForSegment` _does_ modify 
the server view, but only for the new server, not the old one.
   
   Anywhere there is a lack of symmetry and obviousness like this, doc comments 
are especially important.

##########
File path: server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.CachingClusteredClient;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.SimpleServerView;
+import org.apache.druid.client.TestHttpClient;
+import org.apache.druid.client.TestHttpClient.SimpleServerManager;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SegmentMissingException;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class RetryQueryRunnerTest
+{
+  private static final Closer CLOSER = Closer.create();
+  private static final String DATASOURCE = "datasource";
+  private static final GeneratorSchemaInfo SCHEMA_INFO = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+  private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final QueryToolChestWarehouse toolChestWarehouse;
+  private final QueryRunnerFactoryConglomerate conglomerate;
+
+  private SegmentGenerator segmentGenerator;
+  private TestHttpClient httpClient;
+  private SimpleServerView simpleServerView;
+  private CachingClusteredClient cachingClusteredClient;
+  private List<DruidServer> servers;
+
+  public RetryQueryRunnerTest()
+  {
+    conglomerate = 
QueryStackTests.createQueryRunnerFactoryConglomerate(CLOSER, 
USE_PARALLEL_MERGE_POOL_CONFIGURED);
+
+    toolChestWarehouse = new QueryToolChestWarehouse()
+    {
+      @Override
+      public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> 
getToolChest(final QueryType query)
+      {
+        return conglomerate.findFactory(query).getToolchest();
+      }
+    };
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException
+  {
+    CLOSER.close();
+  }
+
+  @Before
+  public void setup()
+  {
+    segmentGenerator = new SegmentGenerator();
+    httpClient = new TestHttpClient(objectMapper);
+    simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, 
httpClient);
+    cachingClusteredClient = new CachingClusteredClient(
+        toolChestWarehouse,
+        simpleServerView,
+        MapCache.create(0),
+        objectMapper,
+        new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 
0),
+        new CacheConfig(),
+        new DruidHttpClientConfig(),
+        
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
+        ForkJoinPool.commonPool(),
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
+    );
+    servers = new ArrayList<>();
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    segmentGenerator.close();
+  }
+
+  private void addServer(DruidServer server, DataSegment dataSegment, 
QueryableIndex queryableIndex)
+  {
+    servers.add(server);
+    simpleServerView.addServer(server, dataSegment);
+    httpClient.addServerAndRunner(server, new 
SimpleServerManager(conglomerate, dataSegment.getId(), queryableIndex));
+  }
+
+  @Test
+  public void testNoRetry()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, false),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(0, queryRunner.getNumTotalRetries());
+    Assert.assertFalse(queryResult.isEmpty());
+    Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
+  }
+
+  @Test
+  public void testRetryForMovedSegment()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, true),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+
+    // Let's move a segment
+    dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
+
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(1, queryRunner.getNumTotalRetries());
+    // Note that we dropped a segment from a server, but it's still announced 
in the server view.
+    // As a result, we may get the full result or not depending on what server 
will get the retry query.
+    // If we hit the same server, the query will return incomplete result.
+    Assert.assertTrue(queryResult.size() > 8);
+    Assert.assertEquals(expectedTimeseriesResult(queryResult.size()), 
queryResult);
+  }
+
+  @Test
+  public void testRetryUntilWeGetFullResult()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(100, false), // retry up to 100
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+
+    // Let's move a segment
+    dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
+
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertTrue(0 < queryRunner.getNumTotalRetries());

Review comment:
       Is there a guarantee that there will be more than zero retries? Why 
won't the the correct, new server for the segment be selected the first time?

##########
File path: server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.CachingClusteredClient;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.SimpleServerView;
+import org.apache.druid.client.TestHttpClient;
+import org.apache.druid.client.TestHttpClient.SimpleServerManager;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SegmentMissingException;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class RetryQueryRunnerTest
+{
+  private static final Closer CLOSER = Closer.create();
+  private static final String DATASOURCE = "datasource";
+  private static final GeneratorSchemaInfo SCHEMA_INFO = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+  private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final QueryToolChestWarehouse toolChestWarehouse;
+  private final QueryRunnerFactoryConglomerate conglomerate;
+
+  private SegmentGenerator segmentGenerator;
+  private TestHttpClient httpClient;
+  private SimpleServerView simpleServerView;
+  private CachingClusteredClient cachingClusteredClient;
+  private List<DruidServer> servers;
+
+  public RetryQueryRunnerTest()
+  {
+    conglomerate = 
QueryStackTests.createQueryRunnerFactoryConglomerate(CLOSER, 
USE_PARALLEL_MERGE_POOL_CONFIGURED);
+
+    toolChestWarehouse = new QueryToolChestWarehouse()
+    {
+      @Override
+      public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> 
getToolChest(final QueryType query)
+      {
+        return conglomerate.findFactory(query).getToolchest();
+      }
+    };
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException
+  {
+    CLOSER.close();
+  }
+
+  @Before
+  public void setup()
+  {
+    segmentGenerator = new SegmentGenerator();
+    httpClient = new TestHttpClient(objectMapper);
+    simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, 
httpClient);
+    cachingClusteredClient = new CachingClusteredClient(
+        toolChestWarehouse,
+        simpleServerView,
+        MapCache.create(0),
+        objectMapper,
+        new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 
0),
+        new CacheConfig(),
+        new DruidHttpClientConfig(),
+        
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
+        ForkJoinPool.commonPool(),
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
+    );
+    servers = new ArrayList<>();
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    segmentGenerator.close();
+  }
+
+  private void addServer(DruidServer server, DataSegment dataSegment, 
QueryableIndex queryableIndex)
+  {
+    servers.add(server);
+    simpleServerView.addServer(server, dataSegment);
+    httpClient.addServerAndRunner(server, new 
SimpleServerManager(conglomerate, dataSegment.getId(), queryableIndex));
+  }
+
+  @Test
+  public void testNoRetry()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, false),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(0, queryRunner.getNumTotalRetries());
+    Assert.assertFalse(queryResult.isEmpty());
+    Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
+  }
+
+  @Test
+  public void testRetryForMovedSegment()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, true),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+
+    // Let's move a segment
+    dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
+
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(1, queryRunner.getNumTotalRetries());
+    // Note that we dropped a segment from a server, but it's still announced 
in the server view.
+    // As a result, we may get the full result or not depending on what server 
will get the retry query.
+    // If we hit the same server, the query will return incomplete result.
+    Assert.assertTrue(queryResult.size() > 8);

Review comment:
       Could we use exact sizes here? (If we can't be sure what we'll get, 
maybe check that there's one of two sizes, and run the test repeatedly for a 
minimum number of times and verify that we actually get both.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to