jihoonson commented on a change in pull request #10082:
URL: https://github.com/apache/druid/pull/10082#discussion_r447336897



##########
File path: server/src/test/java/org/apache/druid/query/RetryQueryRunnerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.CachingClusteredClient;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.SimpleServerView;
+import org.apache.druid.client.TestHttpClient;
+import org.apache.druid.client.TestHttpClient.SimpleServerManager;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.cache.ForegroundCachePopulator;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.guice.http.DruidHttpClientConfig;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesResultValue;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SegmentMissingException;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.generator.SegmentGenerator;
+import org.apache.druid.server.QueryStackTests;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class RetryQueryRunnerTest
+{
+  private static final Closer CLOSER = Closer.create();
+  private static final String DATASOURCE = "datasource";
+  private static final GeneratorSchemaInfo SCHEMA_INFO = 
GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
+  private static final boolean USE_PARALLEL_MERGE_POOL_CONFIGURED = false;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private final ObjectMapper objectMapper = new DefaultObjectMapper();
+  private final QueryToolChestWarehouse toolChestWarehouse;
+  private final QueryRunnerFactoryConglomerate conglomerate;
+
+  private SegmentGenerator segmentGenerator;
+  private TestHttpClient httpClient;
+  private SimpleServerView simpleServerView;
+  private CachingClusteredClient cachingClusteredClient;
+  private List<DruidServer> servers;
+
+  public RetryQueryRunnerTest()
+  {
+    conglomerate = 
QueryStackTests.createQueryRunnerFactoryConglomerate(CLOSER, 
USE_PARALLEL_MERGE_POOL_CONFIGURED);
+
+    toolChestWarehouse = new QueryToolChestWarehouse()
+    {
+      @Override
+      public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> 
getToolChest(final QueryType query)
+      {
+        return conglomerate.findFactory(query).getToolchest();
+      }
+    };
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException
+  {
+    CLOSER.close();
+  }
+
+  @Before
+  public void setup()
+  {
+    segmentGenerator = new SegmentGenerator();
+    httpClient = new TestHttpClient(objectMapper);
+    simpleServerView = new SimpleServerView(toolChestWarehouse, objectMapper, 
httpClient);
+    cachingClusteredClient = new CachingClusteredClient(
+        toolChestWarehouse,
+        simpleServerView,
+        MapCache.create(0),
+        objectMapper,
+        new ForegroundCachePopulator(objectMapper, new CachePopulatorStats(), 
0),
+        new CacheConfig(),
+        new DruidHttpClientConfig(),
+        
QueryStackTests.getProcessingConfig(USE_PARALLEL_MERGE_POOL_CONFIGURED),
+        ForkJoinPool.commonPool(),
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
+    );
+    servers = new ArrayList<>();
+  }
+
+  @After
+  public void tearDown() throws IOException
+  {
+    segmentGenerator.close();
+  }
+
+  private void addServer(DruidServer server, DataSegment dataSegment, 
QueryableIndex queryableIndex)
+  {
+    servers.add(server);
+    simpleServerView.addServer(server, dataSegment);
+    httpClient.addServerAndRunner(server, new 
SimpleServerManager(conglomerate, dataSegment.getId(), queryableIndex));
+  }
+
+  @Test
+  public void testNoRetry()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, false),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(0, queryRunner.getNumTotalRetries());
+    Assert.assertFalse(queryResult.isEmpty());
+    Assert.assertEquals(expectedTimeseriesResult(10), queryResult);
+  }
+
+  @Test
+  public void testRetryForMovedSegment()
+  {
+    prepareCluster(10);
+    final TimeseriesQuery query = 
timeseriesQuery(SCHEMA_INFO.getDataInterval());
+    final RetryQueryRunner<Result<TimeseriesResultValue>> queryRunner = 
createQueryRunner(
+        newRetryQueryRunnerConfig(1, true),
+        query
+    );
+    final Sequence<Result<TimeseriesResultValue>> sequence = 
queryRunner.run(QueryPlus.wrap(query));
+
+    // Let's move a segment
+    dropSegmentFromServerAndAddNewServerForSegment(servers.get(0));
+
+    final List<Result<TimeseriesResultValue>> queryResult = sequence.toList();
+    Assert.assertEquals(1, queryRunner.getNumTotalRetries());

Review comment:
       > Is deferring `baseRunner.run()` going to have any unintended 
consequences? For example, is it going to meaningfully delay initiation of 
connections to historicals? I'm not sure, but in general this area of the 
system is sensitive to when exactly things get called, so it would be good to 
look into this when changing it.
   
   Good question. It does defer initiating connections to query servers if the 
broker hasn't initiated them yet. AFAIT, there don't seem obvious unintended 
side effects. [All these query runners running on top of `RetryQueryRunner` are 
not aware of that the query will be run 
remotely](https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java#L393-L425).

##########
File path: server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
+import org.apache.druid.java.util.common.guava.MergeSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.context.ResponseContext.Key;
+import org.apache.druid.segment.SegmentMissingException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+public class RetryQueryRunner<T> implements QueryRunner<T>
+{
+  private static final Logger LOG = new Logger(RetryQueryRunner.class);
+
+  private final QueryRunner<T> baseRunner;
+  private final BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> 
retryRunnerCreateFn;
+  private final RetryQueryRunnerConfig config;
+  private final ObjectMapper jsonMapper;
+
+  /**
+   * Runnable executed after the broker creates query distribution tree for 
the first attempt. This is only
+   * for testing and must not be used in production code.
+   */
+  private final Runnable runnableAfterFirstAttempt;
+
+  private int totalNumRetries;
+
+  public RetryQueryRunner(
+      QueryRunner<T> baseRunner,
+      BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> 
retryRunnerCreateFn,
+      RetryQueryRunnerConfig config,
+      ObjectMapper jsonMapper
+  )
+  {
+    this(baseRunner, retryRunnerCreateFn, config, jsonMapper, () -> {});
+  }
+
+  /**
+   * Constructor only for testing.
+   */
+  @VisibleForTesting
+  RetryQueryRunner(
+      QueryRunner<T> baseRunner,
+      BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>> 
retryRunnerCreateFn,
+      RetryQueryRunnerConfig config,
+      ObjectMapper jsonMapper,
+      Runnable runnableAfterFirstAttempt
+  )
+  {
+    this.baseRunner = baseRunner;
+    this.retryRunnerCreateFn = retryRunnerCreateFn;
+    this.config = config;
+    this.jsonMapper = jsonMapper;
+    this.runnableAfterFirstAttempt = runnableAfterFirstAttempt;
+  }
+
+  @VisibleForTesting
+  int getTotalNumRetries()
+  {
+    return totalNumRetries;
+  }
+
+  @Override
+  public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext 
context)
+  {
+    return new YieldingSequenceBase<T>()
+    {
+      @Override
+      public <OutType> Yielder<OutType> toYielder(OutType initValue, 
YieldingAccumulator<OutType, T> accumulator)
+      {
+        final Sequence<Sequence<T>> retryingSequence = new BaseSequence<>(
+            new IteratorMaker<Sequence<T>, RetryingSequenceIterator>()
+            {
+              @Override
+              public RetryingSequenceIterator make()
+              {
+                return new RetryingSequenceIterator(queryPlus, context, 
baseRunner, runnableAfterFirstAttempt);
+              }
+
+              @Override
+              public void cleanup(RetryingSequenceIterator iterFromMake)
+              {
+                totalNumRetries = iterFromMake.retryCount;
+              }
+            }
+        );
+        return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(), 
retryingSequence)
+            .toYielder(initValue, accumulator);
+      }
+    };
+  }
+
+  private List<SegmentDescriptor> getMissingSegments(QueryPlus<T> queryPlus, 
final ResponseContext context)
+  {
+    // Sanity check before retrieving missingSegments from responseContext.
+    // The missingSegments in the responseContext is only valid when all 
servers have responded to the broker.
+    // The remainingResponses must be not null but 0 in the responseContext at 
this point.
+    final ConcurrentHashMap<String, Integer> idToRemainingResponses =
+        (ConcurrentHashMap<String, Integer>) Preconditions.checkNotNull(
+            context.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS),
+            "%s in responseContext",
+            Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName()
+        );
+
+    final int remainingResponses = Preconditions.checkNotNull(
+        idToRemainingResponses.get(queryPlus.getQuery().getMostRelevantId()),
+        "Number of remaining responses for query[%s]",
+        queryPlus.getQuery().getMostRelevantId()
+    );
+    if (remainingResponses > 0) {
+      throw new ISE("Failed to check missing segments due to missing responds 
from [%d] servers", remainingResponses);
+    }
+
+    final Object maybeMissingSegments = 
context.get(ResponseContext.Key.MISSING_SEGMENTS);
+    if (maybeMissingSegments == null) {
+      return Collections.emptyList();
+    }
+
+    return jsonMapper.convertValue(
+        maybeMissingSegments,
+        new TypeReference<List<SegmentDescriptor>>()
+        {
+        }
+    );
+  }
+
+  /**
+   * A lazy iterator populating {@link Sequence} by retrying the query. The 
first returned sequence is always the base
+   * sequence from the baseQueryRunner. Subsequent sequences are created 
dynamically whenever it retries the query. All
+   * the sequences populated by this iterator will be merged (not combined) 
with the base sequence.
+   *
+   * The design of this iterator depends on how {@link MergeSequence} works; 
the MergeSequence pops an item from
+   * each underlying sequence and pushes them to a {@link 
java.util.PriorityQueue}. Whenever it pops from the queue,
+   * it pushes a new item from the sequence where the returned item was 
originally from. Since the first returned
+   * sequence from this iterator is always the base sequence, the 
MergeSequence will call {@link Sequence#toYielder}
+   * on the base sequence first which in turn initializing query distribution 
tree. Once this tree is built, the query
+   * servers (historicals and realtime tasks) will lock all segments to read 
and report missing segments to the broker.
+   * If there are missing segments reported, this iterator will rewrite the 
query with those reported segments and
+   * reissue the rewritten query.
+   *
+   * @see org.apache.druid.client.CachingClusteredClient
+   * @see org.apache.druid.client.DirectDruidClient
+   */
+  private class RetryingSequenceIterator implements Iterator<Sequence<T>>
+  {
+    private final QueryPlus<T> queryPlus;
+    private final ResponseContext context;
+    private final QueryRunner<T> baseQueryRunner;
+    private final Runnable runnableAfterFirstAttempt;
+
+    private boolean first = true;
+    private Sequence<T> sequence = null;
+    private int retryCount = 0;
+
+    private RetryingSequenceIterator(
+        QueryPlus<T> queryPlus,
+        ResponseContext context,
+        QueryRunner<T> baseQueryRunner,
+        Runnable runnableAfterFirstAttempt
+    )
+    {
+      this.queryPlus = queryPlus;
+      this.context = context;
+      this.baseQueryRunner = baseQueryRunner;
+      this.runnableAfterFirstAttempt = runnableAfterFirstAttempt;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      if (first) {
+        sequence = baseQueryRunner.run(queryPlus, context);
+        // runnableAfterFirstAttempt is only for testing, it must be no-op for 
production code.
+        runnableAfterFirstAttempt.run();

Review comment:
       I agree with @gianm. I'm more worried about that the tests may be able 
to test different code than production if we forget to sync when we modify this 
part later in the future.

##########
File path: processing/src/main/java/org/apache/druid/query/Query.java
##########
@@ -154,6 +161,17 @@ default String getSqlQueryId()
     return null;
   }
 
+  /**
+   * Returns a most relevant ID of this query; if it is a subquery, this will 
return its subquery ID.
+   * If it is a regular query without subqueries, this will return its query 
ID.
+   * This method should be called after the relevant ID is assigned using 
{@link #withId} or {@link #withSubQueryId}.
+   */
+  default String getMostRelevantId()

Review comment:
       Renamed as suggested and added a debug log.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to