gianm commented on a change in pull request #10082:
URL: https://github.com/apache/druid/pull/10082#discussion_r448076264
##########
File path:
processing/src/main/java/org/apache/druid/query/context/ResponseContext.java
##########
@@ -112,6 +114,30 @@
"uncoveredIntervalsOverflowed",
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
),
+ /**
+ * Map of most relevant query ID to remaining number of responses from
query nodes.
+ * The value is initialized in {@code CachingClusteredClient} when it
initializes the connection to the query nodes,
+ * and is updated whenever they respond (@code DirectDruidClient). {@code
RetryQueryRunner} uses this value to
+ * check if the {@link #MISSING_SEGMENTS} is valid.
+ *
+ * Currently, the broker doesn't run subqueries in parallel, the remaining
number of responses will be updated
+ * one by one per subquery. However, since we are planning to parallelize
running subqueries, we store them
Review comment:
> we are planning to parallelize running subqueries
I'm not sure if we are, but, we also haven't decided definitely _not_ to.
I think it would also be okay to not handle parallel queries here, but
instead build in a sanity check that verifies the subqueries are issued in
series. Maybe by verifying that when the id changes, the previous number of
responses remaining must be down to zero. It's up to you. I think your current
code is okay too.
##########
File path: server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
+import org.apache.druid.java.util.common.guava.MergeSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.context.ResponseContext.Key;
+import org.apache.druid.segment.SegmentMissingException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+public class RetryQueryRunner<T> implements QueryRunner<T>
+{
+ private static final Logger LOG = new Logger(RetryQueryRunner.class);
+
+ private final QueryRunner<T> baseRunner;
+ private final BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>>
retryRunnerCreateFn;
+ private final RetryQueryRunnerConfig config;
+ private final ObjectMapper jsonMapper;
+
+ /**
+ * Runnable executed after the broker creates query distribution tree for
the first attempt. This is only
+ * for testing and must not be used in production code.
+ */
+ private final Runnable runnableAfterFirstAttempt;
+
+ private int totalNumRetries;
+
+ public RetryQueryRunner(
+ QueryRunner<T> baseRunner,
+ BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>>
retryRunnerCreateFn,
+ RetryQueryRunnerConfig config,
+ ObjectMapper jsonMapper
+ )
+ {
+ this(baseRunner, retryRunnerCreateFn, config, jsonMapper, () -> {});
+ }
+
+ /**
+ * Constructor only for testing.
+ */
+ @VisibleForTesting
+ RetryQueryRunner(
+ QueryRunner<T> baseRunner,
+ BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>>
retryRunnerCreateFn,
+ RetryQueryRunnerConfig config,
+ ObjectMapper jsonMapper,
+ Runnable runnableAfterFirstAttempt
+ )
+ {
+ this.baseRunner = baseRunner;
+ this.retryRunnerCreateFn = retryRunnerCreateFn;
+ this.config = config;
+ this.jsonMapper = jsonMapper;
+ this.runnableAfterFirstAttempt = runnableAfterFirstAttempt;
+ }
+
+ @VisibleForTesting
+ int getTotalNumRetries()
+ {
+ return totalNumRetries;
+ }
+
+ @Override
+ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext
context)
+ {
+ return new YieldingSequenceBase<T>()
+ {
+ @Override
+ public <OutType> Yielder<OutType> toYielder(OutType initValue,
YieldingAccumulator<OutType, T> accumulator)
+ {
+ final Sequence<Sequence<T>> retryingSequence = new BaseSequence<>(
+ new IteratorMaker<Sequence<T>, RetryingSequenceIterator>()
+ {
+ @Override
+ public RetryingSequenceIterator make()
+ {
+ return new RetryingSequenceIterator(queryPlus, context,
baseRunner, runnableAfterFirstAttempt);
+ }
+
+ @Override
+ public void cleanup(RetryingSequenceIterator iterFromMake)
+ {
+ totalNumRetries = iterFromMake.retryCount;
+ }
+ }
+ );
+ return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(),
retryingSequence)
+ .toYielder(initValue, accumulator);
+ }
+ };
+ }
+
+ private List<SegmentDescriptor> getMissingSegments(QueryPlus<T> queryPlus,
final ResponseContext context)
+ {
+ // Sanity check before retrieving missingSegments from responseContext.
+ // The missingSegments in the responseContext is only valid when all
servers have responded to the broker.
+ // The remainingResponses must be not null but 0 in the responseContext at
this point.
+ final ConcurrentHashMap<String, Integer> idToRemainingResponses =
+ (ConcurrentHashMap<String, Integer>) Preconditions.checkNotNull(
+ context.get(Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS),
+ "%s in responseContext",
+ Key.REMAINING_RESPONSES_FROM_QUERY_SERVERS.getName()
+ );
+
+ final int remainingResponses = Preconditions.checkNotNull(
+ idToRemainingResponses.get(queryPlus.getQuery().getMostSpecificId()),
+ "Number of remaining responses for query[%s]",
+ queryPlus.getQuery().getMostSpecificId()
+ );
+ if (remainingResponses > 0) {
+ throw new ISE("Failed to check missing segments due to missing responds
from [%d] servers", remainingResponses);
Review comment:
If this happens, is it a bug? Or might this happen for some legitimate
reason?
If it is a bug: please include a comment that this message means there was a
bug. (So people that get the message and search for it in the code will see
that it is a sign of a bug.)
If there could be a legitimate reason: in this case we should improve the
error message to help the user understand what the legitimate reason might be.
(a nit: spelling: should be "missing responses" rather than "missing
responds".)
##########
File path:
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
##########
@@ -329,15 +329,18 @@ private DataSource inlineIfNecessary(
}
} else if (canRunQueryUsingLocalWalker(subQuery) ||
canRunQueryUsingClusterWalker(subQuery)) {
// Subquery needs to be inlined. Assign it a subquery id and run it.
- final Query subQueryWithId =
subQuery.withSubQueryId(UUID.randomUUID().toString());
+ final Query subQueryWithId = subQuery.withDefaultSubQueryId();
final Sequence<?> queryResults;
if (dryRun) {
queryResults = Sequences.empty();
} else {
final QueryRunner subqueryRunner = subQueryWithId.getRunner(this);
- queryResults = subqueryRunner.run(QueryPlus.wrap(subQueryWithId));
+ queryResults = subqueryRunner.run(
+ QueryPlus.wrap(subQueryWithId),
+ DirectDruidClient.makeResponseContextForQuery()
Review comment:
Oops, not including a context here was a mistake, thanks for fixing it.
Now that I think about it, though, making a new context here will mean we
aren't going to properly return context from subqueries up to the original
caller. This includes not reporting missing segments in the case where
`RetryQueryRunnerConfig.isReturnPartialResults = true`.
It would be better to share the context that was created in QueryLifecycle.
Is it feasible to do this? Maybe by moving some of this logic to be lazy and
happen inside the returned QueryRunner? (It will get a copy of the context.)
Btw, this sounds like it might be tough to do, so we could also address it
with documentation about known limitations. But I think we either need to fix
it, or document it.
##########
File path: server/src/main/java/org/apache/druid/query/RetryQueryRunner.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.BaseSequence.IteratorMaker;
+import org.apache.druid.java.util.common.guava.MergeSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.YieldingAccumulator;
+import org.apache.druid.java.util.common.guava.YieldingSequenceBase;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.context.ResponseContext.Key;
+import org.apache.druid.segment.SegmentMissingException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+
+public class RetryQueryRunner<T> implements QueryRunner<T>
+{
+ private static final Logger LOG = new Logger(RetryQueryRunner.class);
+
+ private final QueryRunner<T> baseRunner;
+ private final BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>>
retryRunnerCreateFn;
+ private final RetryQueryRunnerConfig config;
+ private final ObjectMapper jsonMapper;
+
+ /**
+ * Runnable executed after the broker creates query distribution tree for
the first attempt. This is only
+ * for testing and must not be used in production code.
+ */
+ private final Runnable runnableAfterFirstAttempt;
+
+ private int totalNumRetries;
+
+ public RetryQueryRunner(
+ QueryRunner<T> baseRunner,
+ BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>>
retryRunnerCreateFn,
+ RetryQueryRunnerConfig config,
+ ObjectMapper jsonMapper
+ )
+ {
+ this(baseRunner, retryRunnerCreateFn, config, jsonMapper, () -> {});
+ }
+
+ /**
+ * Constructor only for testing.
+ */
+ @VisibleForTesting
+ RetryQueryRunner(
+ QueryRunner<T> baseRunner,
+ BiFunction<Query<T>, List<SegmentDescriptor>, QueryRunner<T>>
retryRunnerCreateFn,
+ RetryQueryRunnerConfig config,
+ ObjectMapper jsonMapper,
+ Runnable runnableAfterFirstAttempt
+ )
+ {
+ this.baseRunner = baseRunner;
+ this.retryRunnerCreateFn = retryRunnerCreateFn;
+ this.config = config;
+ this.jsonMapper = jsonMapper;
+ this.runnableAfterFirstAttempt = runnableAfterFirstAttempt;
+ }
+
+ @VisibleForTesting
+ int getTotalNumRetries()
+ {
+ return totalNumRetries;
+ }
+
+ @Override
+ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext
context)
+ {
+ return new YieldingSequenceBase<T>()
+ {
+ @Override
+ public <OutType> Yielder<OutType> toYielder(OutType initValue,
YieldingAccumulator<OutType, T> accumulator)
+ {
+ final Sequence<Sequence<T>> retryingSequence = new BaseSequence<>(
+ new IteratorMaker<Sequence<T>, RetryingSequenceIterator>()
+ {
+ @Override
+ public RetryingSequenceIterator make()
+ {
+ return new RetryingSequenceIterator(queryPlus, context,
baseRunner, runnableAfterFirstAttempt);
+ }
+
+ @Override
+ public void cleanup(RetryingSequenceIterator iterFromMake)
+ {
+ totalNumRetries = iterFromMake.retryCount;
+ }
+ }
+ );
+ return new MergeSequence<>(queryPlus.getQuery().getResultOrdering(),
retryingSequence)
+ .toYielder(initValue, accumulator);
+ }
+ };
+ }
+
+ private List<SegmentDescriptor> getMissingSegments(QueryPlus<T> queryPlus,
final ResponseContext context)
Review comment:
I'm not sure where to put this comment, but I noticed that the
`MISSING_SEGMENTS` array could get truncated by the historical that generated
it; see `ResponseContext.serializeWith`. It looks like this was discussed in
#2331. We can't allow this if we are going to rely on the missing segments list
for query correctness. I think that means we need to introduce an option that
tells the QueryResource that it should throw an error rather than truncate, and
we should always set that option when communicating from the Broker to data
servers.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]