imply-cheddar commented on code in PR #17354: URL: https://github.com/apache/druid/pull/17354#discussion_r1811123151
########## processing/src/main/java/org/apache/druid/query/QueryExecutor.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +/** + * Executes the query by utilizing the given walker. + */ +public interface QueryExecutor<T> +{ + QueryRunner<T> makeQueryRunner( + Query<T> query, + QuerySegmentWalker walker + ); +} Review Comment: I made this comment elsewhere as well, but what do you think about calling this `QueryLogic` and then the method you are creating is `initialEntryPoint()`? ########## processing/src/main/java/org/apache/druid/query/DefaultQueryRunnerFactoryConglomerate.java: ########## @@ -19,29 +19,53 @@ package org.apache.druid.query; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.google.inject.Inject; - import java.util.IdentityHashMap; import java.util.Map; -/** -*/ public class DefaultQueryRunnerFactoryConglomerate implements QueryRunnerFactoryConglomerate { private final Map<Class<? extends Query>, QueryRunnerFactory> factories; + private final Map<Class<? extends Query>, QueryToolChest> toolchests; - @Inject + @VisibleForTesting Review Comment: Is there a reason why we cannot either update the places that are calling this to pass in an empty list? That or make a public static that is `buildFromFactories()` which can then be part of the public API and doesn't need this annotation. ########## processing/src/main/java/org/apache/druid/query/union/UnionQueryQueryToolChest.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.union; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.allocation.MemoryAllocatorFactory; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.DefaultQueryMetrics; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryExecutor; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.aggregation.MetricManipulationFn; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.RowSignature.Finalization; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class UnionQueryQueryToolChest extends QueryToolChest<UnionResult, UnionQuery> + implements QueryExecutor<UnionResult> +{ + @Override + public QueryRunner<UnionResult> makeQueryRunner(Query<UnionResult> query, + QuerySegmentWalker clientQuerySegmentWalker) + { + return new UnionQueryRunner((UnionQuery) query, clientQuerySegmentWalker); + } + + @Override + @SuppressWarnings("unchecked") + public QueryRunner<UnionResult> mergeResults(QueryRunner<UnionResult> runner) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public QueryMetrics<? super UnionQuery> makeMetrics(UnionQuery query) + { + return new DefaultQueryMetrics<>(); + } + + @Override + public Function<UnionResult, UnionResult> makePreComputeManipulatorFn( + UnionQuery query, + MetricManipulationFn fn) + { + return Functions.identity(); + } + + @Override + public TypeReference<UnionResult> getResultTypeReference() + { + return null; + } + + @Override + public RowSignature resultArraySignature(UnionQuery query) + { + for (Query<?> q : query.queries) { + RowSignature sig = q.getResultRowSignature(Finalization.UNKNOWN); + if (sig != null) { + return sig; + } + } + throw DruidException.defensive("None of the subqueries support row signature"); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public Sequence<Object[]> resultsAsArrays( + UnionQuery query, + Sequence<UnionResult> resultSequence) + { + return new UnionSequenceMaker<Object[]>() + { + @Override + public Optional<Sequence<Object[]>> transformResults(Query<?> query, Sequence<Object> results) + { + QueryToolChest toolChest = conglomerate.getToolChest(query); + return Optional.of(toolChest.resultsAsArrays(query, results)); + } + }.transform(query, resultSequence).get(); + } Review Comment: The way this works is pretty hard to follow. It's fundamentally working around a poor API definition when the method `resultsAsArrays` was initially created. This PR (https://github.com/apache/druid/pull/16800) starts to provide the `QueryRunner` knowledge of the type of data that it needs with the `serialization` context parameter. We should be able to use that to identify if we want arrays or frames or whatever and make these methods `resultsAsArrays()` and `resultsAsFrames()` to just be identity functions (so that they are fundamentally noops). This means that we should be able to handle the serialization in place when you are actually running the sub queries instead of here in these methods. ########## processing/src/main/java/org/apache/druid/query/union/UnionQueryQueryToolChest.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.union; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; +import com.google.common.base.Functions; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.allocation.MemoryAllocatorFactory; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.DefaultQueryMetrics; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryExecutor; +import org.apache.druid.query.QueryMetrics; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.aggregation.MetricManipulationFn; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.RowSignature.Finalization; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class UnionQueryQueryToolChest extends QueryToolChest<UnionResult, UnionQuery> + implements QueryExecutor<UnionResult> +{ + @Override + public QueryRunner<UnionResult> makeQueryRunner(Query<UnionResult> query, + QuerySegmentWalker clientQuerySegmentWalker) + { + return new UnionQueryRunner((UnionQuery) query, clientQuerySegmentWalker); + } + + @Override + @SuppressWarnings("unchecked") + public QueryRunner<UnionResult> mergeResults(QueryRunner<UnionResult> runner) + { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public QueryMetrics<? super UnionQuery> makeMetrics(UnionQuery query) + { + return new DefaultQueryMetrics<>(); + } + + @Override + public Function<UnionResult, UnionResult> makePreComputeManipulatorFn( + UnionQuery query, + MetricManipulationFn fn) + { + return Functions.identity(); + } + + @Override + public TypeReference<UnionResult> getResultTypeReference() + { + return null; + } + + @Override + public RowSignature resultArraySignature(UnionQuery query) + { + for (Query<?> q : query.queries) { + RowSignature sig = q.getResultRowSignature(Finalization.UNKNOWN); + if (sig != null) { + return sig; + } + } + throw DruidException.defensive("None of the subqueries support row signature"); Review Comment: This would be `InvalidInput` not `defensive`. The idea is that if someone has sent this query, they should be structuring it as something that can be used for Union, so if there wasn't a single query provided that can do that, it's a user-error problem in the queries provided. ########## sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidUnion.java: ########## @@ -66,24 +70,72 @@ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) @Override public SourceDesc getSourceDesc(PlannerContext plannerContext, List<SourceDesc> sources) { - List<DataSource> dataSources = new ArrayList<>(); - RowSignature signature = null; - for (SourceDesc sourceDesc : sources) { - checkDataSourceSupported(sourceDesc.dataSource); - dataSources.add(sourceDesc.dataSource); - if (signature == null) { - signature = sourceDesc.rowSignature; - } else { - if (!signature.equals(sourceDesc.rowSignature)) { - throw DruidException.defensive( - "Row signature mismatch in Union inputs [%s] and [%s]", - signature, - sourceDesc.rowSignature - ); + if (mayUseUnionDataSource(sources)) { + List<DataSource> dataSources = new ArrayList<>(); + RowSignature signature = null; + for (SourceDesc sourceDesc : sources) { + checkDataSourceSupported(sourceDesc.dataSource); + dataSources.add(sourceDesc.dataSource); + if (signature == null) { + signature = sourceDesc.rowSignature; + } else { + if (!signature.equals(sourceDesc.rowSignature)) { + throw DruidException.defensive( + "Row signature mismatch in Union inputs [%s] and [%s]", + signature, + sourceDesc.rowSignature + ); + } + } + } + return new SourceDesc(new UnionDataSource(dataSources), signature); + } + if (mayUseUnionQuery(sources)) { + RowSignature signature = null; + List<Query<?>> queries = new ArrayList<>(); + for (SourceDesc sourceDesc : sources) { + QueryDataSource qds = (QueryDataSource) sourceDesc.dataSource; + queries.add(qds.getQuery()); + if (signature == null) { + signature = sourceDesc.rowSignature; + } else { + if (!signature.equals(sourceDesc.rowSignature)) { + throw DruidException.defensive( + "Row signature mismatch in Union inputs [%s] and [%s]", + signature, + sourceDesc.rowSignature + ); + } } } + return new SourceDesc(new QueryDataSource(new UnionQuery(queries)), signature); + } + + throw DruidException.defensive("XXXOnly Table and Values are supported as inputs for Union [%s]", sources); + } Review Comment: I'm unsure if this is truly a "defensive" exception or if it is indicative of bad user input? Is there a way that a user can do something to cause this exception to get thrown? If we don't expect it to be seen, then a message like "Got an input type [%s] that is not supported. This should not happen". ########## processing/src/main/java/org/apache/druid/query/DefaultQueryRunnerFactoryConglomerate.java: ########## @@ -19,29 +19,53 @@ package org.apache.druid.query; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.google.inject.Inject; - import java.util.IdentityHashMap; import java.util.Map; -/** -*/ public class DefaultQueryRunnerFactoryConglomerate implements QueryRunnerFactoryConglomerate { private final Map<Class<? extends Query>, QueryRunnerFactory> factories; + private final Map<Class<? extends Query>, QueryToolChest> toolchests; - @Inject + @VisibleForTesting public DefaultQueryRunnerFactoryConglomerate(Map<Class<? extends Query>, QueryRunnerFactory> factories) { - // Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap. - // Class doesn't override Object.equals(). + this(factories, Maps.transformValues(factories, f -> f.getToolchest())); + } + + @Inject + public DefaultQueryRunnerFactoryConglomerate(Map<Class<? extends Query>, QueryRunnerFactory> factories, + Map<Class<? extends Query>, QueryToolChest> toolchests) + { this.factories = new IdentityHashMap<>(factories); + this.toolchests = new IdentityHashMap<>(toolchests); } @Override @SuppressWarnings("unchecked") public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query) { - return (QueryRunnerFactory<T, QueryType>) factories.get(query.getClass()); + return factories.get(query.getClass()); + } + + @Override + @SuppressWarnings("unchecked") + public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(QueryType query) + { + return toolchests.get(query.getClass()); + } + + @Override + @SuppressWarnings("unchecked") + public <T, QueryType extends Query<T>> QueryExecutor<T> getQueryExecutor(QueryType query) + { + QueryToolChest<T, QueryType> toolchest = getToolChest(query); + if (toolchest instanceof QueryExecutor) { + return (QueryExecutor<T>) toolchest; + } + return null; } Review Comment: Or we name the interface `QueryLogic` and have a method called `entryPoint`. This would actually align with something else I'm doing as well, so I kinda like that. Especially as the distinction between the ToolChest and the QueryRunnerFactory has basically disappeared as the system has evolved. ########## processing/src/main/java/org/apache/druid/query/DefaultQueryRunnerFactoryConglomerate.java: ########## @@ -19,29 +19,53 @@ package org.apache.druid.query; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.google.inject.Inject; - import java.util.IdentityHashMap; import java.util.Map; -/** -*/ public class DefaultQueryRunnerFactoryConglomerate implements QueryRunnerFactoryConglomerate { private final Map<Class<? extends Query>, QueryRunnerFactory> factories; + private final Map<Class<? extends Query>, QueryToolChest> toolchests; - @Inject + @VisibleForTesting public DefaultQueryRunnerFactoryConglomerate(Map<Class<? extends Query>, QueryRunnerFactory> factories) { - // Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap. - // Class doesn't override Object.equals(). + this(factories, Maps.transformValues(factories, f -> f.getToolchest())); + } + + @Inject + public DefaultQueryRunnerFactoryConglomerate(Map<Class<? extends Query>, QueryRunnerFactory> factories, + Map<Class<? extends Query>, QueryToolChest> toolchests) + { this.factories = new IdentityHashMap<>(factories); + this.toolchests = new IdentityHashMap<>(toolchests); } @Override @SuppressWarnings("unchecked") public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query) { - return (QueryRunnerFactory<T, QueryType>) factories.get(query.getClass()); + return factories.get(query.getClass()); + } + + @Override + @SuppressWarnings("unchecked") + public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(QueryType query) + { + return toolchests.get(query.getClass()); + } + + @Override + @SuppressWarnings("unchecked") + public <T, QueryType extends Query<T>> QueryExecutor<T> getQueryExecutor(QueryType query) + { + QueryToolChest<T, QueryType> toolchest = getToolChest(query); + if (toolchest instanceof QueryExecutor) { + return (QueryExecutor<T>) toolchest; + } + return null; } Review Comment: Naming issue: "QueryExecutor" is extremely generic, the name should more reflect where in the lifecycle the query is being executed, so that it's more clear what this is in charge of. We should also have javadoc describing what this is responsible for. I'd suggest something like, `QueryEntryPointRunner` or something similar to that. ########## processing/src/main/java/org/apache/druid/query/union/UnionQueryRunner.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.union; + +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.context.ResponseContext; + +import java.util.ArrayList; +import java.util.List; + +class UnionQueryRunner implements QueryRunner<UnionResult> +{ + private final QuerySegmentWalker walker; + private final List<QueryRunner> runners; + + public UnionQueryRunner( + UnionQuery query, + QuerySegmentWalker walker + ) + { + this.walker = walker; + this.runners = makeSubQueryRunners(query); + } + + private List<QueryRunner> makeSubQueryRunners(UnionQuery unionQuery) + { + List<QueryRunner> runners = new ArrayList<>(); + for (Query<?> query : unionQuery.queries) { + runners.add(query.getRunner(walker)); + } + return runners; + + } + + @Override + public Sequence<UnionResult> run(QueryPlus<UnionResult> queryPlus, ResponseContext responseContext) + { + UnionQuery unionQuery = queryPlus.unwrapQuery(UnionQuery.class); + + List<UnionResult> seqs = new ArrayList<UnionResult>(); + for (int i = 0; i < runners.size(); i++) { + Query<?> q = unionQuery.queries.get(i); + QueryRunner r = runners.get(i); + seqs.add(makeUnionResult(r, queryPlus.withQuery(q), responseContext)); Review Comment: This is the place to apply the context parameter for the serialization type. While it would be best if the different queries all just understood the context properly, all we need right now is the UnionQuery to understand it. It can check the context parameter here and then apply the correct method to get the right data. ########## server/src/test/java/org/apache/druid/query/QueryRunnerBasedOnClusteredClientTestBase.java: ########## @@ -107,17 +107,9 @@ protected QueryRunnerBasedOnClusteredClientTestBase() { conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate( CLOSER, - () -> TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD + TopNQueryConfig.DEFAULT_MIN_TOPN_THRESHOLD ); - - toolChestWarehouse = new QueryToolChestWarehouse() - { - @Override - public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query) - { - return conglomerate.findFactory(query).getToolchest(); - } - }; + toolChestWarehouse = conglomerate; Review Comment: Just get rid of the `toolChestWarehouse` field entirely? ########## processing/src/main/java/org/apache/druid/query/QueryToolChest.java: ########## @@ -71,6 +73,12 @@ protected QueryToolChest() } } + @Inject + public void setWarehouse(QueryRunnerFactoryConglomerate conglomerate) + { + this.conglomerate = conglomerate; + } + Review Comment: This should be a dependency if it is needed instead of using `setWarehouse()`. ########## server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java: ########## @@ -443,15 +450,23 @@ private <T> DataSource inlineIfNecessary( .toString() ) ); - queryResults = subQueryWithSerialization - .getRunner(this) + + QueryExecutor<Object> subQueryExecutor = conglomerate.getQueryExecutor(subQuery); + final QueryRunner subQueryRunner; + if (subQueryExecutor != null) { + subQueryRunner = subQueryExecutor.makeQueryRunner(subQueryWithSerialization, this); + } else { + subQueryRunner = subQueryWithSerialization.getRunner(this); + } + + queryResults = subQueryRunner Review Comment: This feels like the wrong place to be checking for the Executor. Why can't we do it up around line 386? ########## processing/src/main/java/org/apache/druid/query/union/UnionResult.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.union; + +import org.apache.druid.java.util.common.guava.Sequence; + +/** + * Holds the resulting Sequence for a union query branch. + * + * Caveat: the index of the ResultUnionResult in the output sequence is in line + * with the index of the executed query. + */ +public class UnionResult +{ + private final Sequence<?> seq; + + public UnionResult(Sequence<?> seq) + { + this.seq = seq; + } + + public <T> Sequence<T> getResults() + { + return (Sequence<T>) seq; + + } + +} Review Comment: If you adjust to use the context parameter, I don't think you need this class anymore. ########## processing/src/main/java/org/apache/druid/query/union/UnionQuery.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.union; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Ordering; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.SegmentReference; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class UnionQuery implements Query<UnionResult> +{ + @JsonProperty("context") + protected final Map<String, Object> context; + + @JsonProperty("queries") + protected final List<Query<?>> queries; + + public UnionQuery(List<Query<?>> queries) + { + this(queries, queries.get(0).getContext()); + } + + @JsonCreator + public UnionQuery( + @JsonProperty("queries") List<Query<?>> queries, + @JsonProperty("context") Map<String, Object> context) + { + Preconditions.checkArgument(queries.size() > 1, "union with fewer than 2 queries makes no sense"); + this.queries = queries; + this.context = context; + } + + @Override + public DataSource getDataSource() + { + throw DruidException.defensive("This is not supported"); + } + + @Override + public List<DataSource> getDataSources() + { + List<DataSource> dataSources = new ArrayList<>(); + for (Query<?> query : queries) { + dataSources.add(query.getDataSource()); + } + return dataSources; + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return getClass().getSimpleName(); + } + + @Override + public QueryRunner<UnionResult> getRunner(QuerySegmentWalker walker) + { + throw DruidException.defensive("Use QueryToolChest to get a Runner"); + } + + @Override + public List<Interval> getIntervals() + { + return Collections.emptyList(); + } + + @Override + public Duration getDuration() + { + throw DruidException.defensive("Method supported. [%s]", DruidException.getCurrentMethodName()); + } + + @Override + public Granularity getGranularity() + { + return Granularities.ALL; + } + + @Override + public DateTimeZone getTimezone() + { + throw DruidException.defensive("Method supported. [%s]", DruidException.getCurrentMethodName()); + } Review Comment: Why do we need the method name in the message? It's part of the exception... Also, your message is perhaps missing a `not`? Are these intended to be "not yet implemented" or "not implemented and should never be implemented"? ########## processing/src/main/java/org/apache/druid/query/Query.java: ########## @@ -285,4 +298,22 @@ default Interval getSingleInterval() ) ); } + + default Query<T> withDataSources(List<DataSource> children) + { + if (children.size() != 1) { + throw new IAE("Must have exactly one child"); + } + return withDataSource(Iterables.getOnlyElement(children)); + } Review Comment: These methods for `getDataSources` and `withDataSources`I would hope that we can eliminate on the interface. They are a leaky abstraction, I *think* that any code that would actually need these would be able to be avoided by implementing the `QueryExecutor` thingie which means that these methods don't need to exist on the interface anymore, right? ########## processing/src/main/java/org/apache/druid/query/union/UnionQuery.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.union; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Ordering; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QuerySegmentWalker; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.segment.SegmentReference; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class UnionQuery implements Query<UnionResult> +{ + @JsonProperty("context") + protected final Map<String, Object> context; + + @JsonProperty("queries") + protected final List<Query<?>> queries; + + public UnionQuery(List<Query<?>> queries) + { + this(queries, queries.get(0).getContext()); + } + + @JsonCreator + public UnionQuery( + @JsonProperty("queries") List<Query<?>> queries, + @JsonProperty("context") Map<String, Object> context) + { + Preconditions.checkArgument(queries.size() > 1, "union with fewer than 2 queries makes no sense"); + this.queries = queries; + this.context = context; + } + + @Override + public DataSource getDataSource() + { + throw DruidException.defensive("This is not supported"); + } + + @Override + public List<DataSource> getDataSources() + { + List<DataSource> dataSources = new ArrayList<>(); + for (Query<?> query : queries) { + dataSources.add(query.getDataSource()); + } + return dataSources; + } + + @Override + public boolean hasFilters() + { + return false; + } + + @Override + public DimFilter getFilter() + { + return null; + } + + @Override + public String getType() + { + return getClass().getSimpleName(); + } + + @Override + public QueryRunner<UnionResult> getRunner(QuerySegmentWalker walker) + { + throw DruidException.defensive("Use QueryToolChest to get a Runner"); + } + + @Override + public List<Interval> getIntervals() + { + return Collections.emptyList(); + } + + @Override + public Duration getDuration() + { + throw DruidException.defensive("Method supported. [%s]", DruidException.getCurrentMethodName()); + } + + @Override + public Granularity getGranularity() + { + return Granularities.ALL; + } + + @Override + public DateTimeZone getTimezone() + { + throw DruidException.defensive("Method supported. [%s]", DruidException.getCurrentMethodName()); + } + + @Override + public Map<String, Object> getContext() + { + return context; + } + + @Override + public Ordering<UnionResult> getResultOrdering() + { + throw DruidException.defensive("Method supported. [%s]", DruidException.getCurrentMethodName()); + } + + @Override + public Query<UnionResult> withOverriddenContext(Map<String, Object> contextOverrides) + { + List<Query<?>> newQueries = mapQueries(q -> q.withOverriddenContext(contextOverrides)); + return new UnionQuery(newQueries, QueryContexts.override(getContext(), contextOverrides)); + } + + @Override + public Query<UnionResult> withQuerySegmentSpec(QuerySegmentSpec spec) + { + throw DruidException.defensive("Method supported. [%s]", DruidException.getCurrentMethodName()); + } + + @Override + public Query<UnionResult> withId(String id) + { + return withOverriddenContext(ImmutableMap.of(BaseQuery.QUERY_ID, id)); + } + + @Override + public String getId() + { + return context().getString(BaseQuery.QUERY_ID); + } + + @Override + public Query<UnionResult> withSubQueryId(String subQueryId) + { + return withOverriddenContext(ImmutableMap.of(BaseQuery.SUB_QUERY_ID, subQueryId)); + } + + @Override + public String getSubQueryId() + { + return context().getString(BaseQuery.SUB_QUERY_ID); + } + + @Override + public Query<UnionResult> withDataSource(DataSource dataSource) + { + throw new RuntimeException("This method is not supported. Use withDataSources instead!"); + } + + @Override + public Query<UnionResult> withDataSources(List<DataSource> children) + { + Preconditions.checkArgument(queries.size() == children.size(), "Number of children must match number of queries"); + List<Query<?>> newQueries = new ArrayList<>(); + for (int i = 0; i < queries.size(); i++) { + newQueries.add(queries.get(i).withDataSource(children.get(i))); + } + return new UnionQuery(newQueries, context); + } + + List<Query<?>> mapQueries(Function<Query<?>, Query<?>> mapFn) + { + List<Query<?>> newQueries = new ArrayList<>(); + for (Query<?> query : queries) { + newQueries.add(mapFn.apply(query)); + } + return newQueries; + } + + @Override + public String toString() + { + return "UnionQuery [context=" + context + ", queries=" + queries + "]"; + } + + @Override + public DataSourceAnalysis getDataSourceAnalysis() + { + OpagueDataSourceCover ds = new OpagueDataSourceCover(new UnionDataSource(getDataSources())); Review Comment: spelling nit: you have `g` instead of `q` for `opaque` ########## processing/src/main/java/org/apache/druid/query/QueryRunnerFactoryConglomerate.java: ########## @@ -21,7 +21,9 @@ /** */ -public interface QueryRunnerFactoryConglomerate +public interface QueryRunnerFactoryConglomerate extends QueryToolChestWarehouse Review Comment: I left this comment elsewhere, but I don't think we need the Warehouse interface really and should just expand what the Conglomerate can do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
