This is an automated email from the ASF dual-hosted git repository.
cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7682b0b6b1 Analysis refactor (#13501)
7682b0b6b1 is described below
commit 7682b0b6b185e4879ba202b0b777fa655e3d4f59
Author: somu-imply <[email protected]>
AuthorDate: Mon Dec 12 17:35:44 2022 -0800
Analysis refactor (#13501)
Refactor DataSource to have a getAnalysis method()
This removes various parts of the code where while loops and instanceof
checks were being used to walk through the structure of DataSource objects
in order to build a DataSourceAnalysis. Instead we just ask the DataSource
for its analysis and allow the stack to rebuild whatever structure existed.
---
.../materializedview/DataSourceOptimizer.java | 4 +-
.../apache/druid/msq/querykit/DataSourcePlan.java | 2 +-
.../druid/msq/querykit/InputNumberDataSource.java | 7 ++
.../overlord/SingleTaskBackgroundRunner.java | 2 +-
.../java/org/apache/druid/query/BaseQuery.java | 4 +-
.../java/org/apache/druid/query/DataSource.java | 6 +
.../org/apache/druid/query/InlineDataSource.java | 7 ++
.../org/apache/druid/query/JoinDataSource.java | 54 +++++++-
.../org/apache/druid/query/LookupDataSource.java | 7 ++
.../main/java/org/apache/druid/query/Queries.java | 3 +-
.../org/apache/druid/query/QueryDataSource.java | 14 +++
.../org/apache/druid/query/TableDataSource.java | 7 ++
.../org/apache/druid/query/UnionDataSource.java | 8 ++
.../org/apache/druid/query/UnionQueryRunner.java | 2 +-
.../org/apache/druid/query/UnnestDataSource.java | 8 ++
.../druid/query/planning/DataSourceAnalysis.java | 136 ++++-----------------
.../query/planning/DataSourceAnalysisTest.java | 84 +++----------
.../apache/druid/segment/join/NoopDataSource.java | 8 ++
.../org/apache/druid/client/BrokerServerView.java | 2 +-
.../druid/client/CachingClusteredClient.java | 2 +-
.../org/apache/druid/client/ServerViewUtil.java | 2 +-
.../join/BroadcastTableJoinableFactory.java | 2 +-
.../appenderator/SinkQuerySegmentWalker.java | 12 +-
.../UnifiedIndexerAppenderatorsManager.java | 4 +-
.../apache/druid/server/ClientInfoResource.java | 3 +-
.../druid/server/ClientQuerySegmentWalker.java | 16 +--
.../druid/server/LocalQuerySegmentWalker.java | 19 +--
.../org/apache/druid/server/SegmentManager.java | 2 +-
.../druid/server/coordination/ServerManager.java | 17 +--
.../apache/druid/client/BrokerServerViewTest.java | 17 ++-
.../org/apache/druid/client/SimpleServerView.java | 2 +-
.../apache/druid/server/SegmentManagerTest.java | 3 +-
.../server/TestClusterQuerySegmentWalker.java | 16 +--
.../server/coordination/ServerManagerTest.java | 51 +++++++-
.../sql/calcite/external/ExternalDataSource.java | 7 ++
.../apache/druid/sql/calcite/rel/DruidQuery.java | 3 +-
.../druid/sql/calcite/run/NativeQueryMaker.java | 9 +-
.../schema/SegmentDataCacheConcurrencyTest.java | 4 +-
38 files changed, 297 insertions(+), 259 deletions(-)
diff --git
a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
index a3c03a6245..38774b1ca0 100644
---
a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
+++
b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
@@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.topn.TopNQuery;
@@ -122,9 +121,10 @@ public class DataSourceOptimizer
List<Interval> remainingQueryIntervals = (List<Interval>)
query.getIntervals();
for (DerivativeDataSource derivativeDataSource :
ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) {
+ TableDataSource tableDataSource = new
TableDataSource(derivativeDataSource.getName());
final List<Interval> derivativeIntervals =
remainingQueryIntervals.stream()
.flatMap(interval -> serverView
- .getTimeline(DataSourceAnalysis.forDataSource(new
TableDataSource(derivativeDataSource.getName())))
+ .getTimeline(tableDataSource.getAnalysis())
.orElseThrow(() -> new ISE("No timeline for dataSource: %s",
derivativeDataSource.getName()))
.lookup(interval)
.stream()
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index 30544cf31b..6868fef56f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -289,7 +289,7 @@ public class DataSourcePlan
)
{
final QueryDefinitionBuilder subQueryDefBuilder =
QueryDefinition.builder();
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(dataSource);
+ final DataSourceAnalysis analysis = dataSource.getAnalysis();
final DataSourcePlan basePlan = forDataSource(
queryKit,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
index 2fb4a61ee8..86207261f9 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@@ -106,6 +107,12 @@ public class InputNumberDataSource implements DataSource
return null;
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+ }
+
@JsonProperty
public int getInputNumber()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index b421f97519..fce76d1d1a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -387,7 +387,7 @@ public class SingleTaskBackgroundRunner implements
TaskRunner, QuerySegmentWalke
QueryRunner<T> queryRunner = null;
if (runningItem != null) {
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final Task task = runningItem.getTask();
if (analysis.getBaseTableDataSource().isPresent()
diff --git a/processing/src/main/java/org/apache/druid/query/BaseQuery.java
b/processing/src/main/java/org/apache/druid/query/BaseQuery.java
index 6158632a15..ed97c549b6 100644
--- a/processing/src/main/java/org/apache/druid/query/BaseQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/BaseQuery.java
@@ -30,7 +30,6 @@ import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
@@ -123,7 +122,8 @@ public abstract class BaseQuery<T> implements Query<T>
@VisibleForTesting
public static QuerySegmentSpec getQuerySegmentSpecForLookUp(BaseQuery<?>
query)
{
- return DataSourceAnalysis.forDataSource(query.getDataSource())
+ DataSource queryDataSource = query.getDataSource();
+ return queryDataSource.getAnalysis()
.getBaseQuerySegmentSpec()
.orElseGet(query::getQuerySegmentSpec);
}
diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java
b/processing/src/main/java/org/apache/druid/query/DataSource.java
index 43dfb3be85..6b26cffbc6 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -124,4 +124,10 @@ public interface DataSource
*/
byte[] getCacheKey();
+ /**
+ * Get the analysis for a data source
+ *
+ * @return The {@link DataSourceAnalysis} object for the callee data source
+ */
+ DataSourceAnalysis getAnalysis();
}
diff --git
a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
index b3c6024039..3fb015d369 100644
--- a/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/InlineDataSource.java
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.RowAdapter;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.ColumnType;
@@ -261,6 +262,12 @@ public class InlineDataSource implements DataSource
return null;
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+ }
+
/**
* Returns the row signature (map of column name to type) for this inline
datasource. Note that types may
* be null, meaning we know we have a column with a certain name, but we
don't know what its type is.
diff --git
a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
index 4b155e5b7e..9f44324503 100644
--- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
@@ -33,6 +33,7 @@ import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.Triple;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.cache.CacheKeyBuilder;
@@ -56,6 +57,7 @@ import
org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -119,7 +121,7 @@ public class JoinDataSource implements DataSource
);
this.leftFilter = leftFilter;
this.joinableFactoryWrapper = joinableFactoryWrapper;
- this.analysis = DataSourceAnalysis.forDataSource(this);
+ this.analysis = this.getAnalysisForDataSource();
}
/**
@@ -457,7 +459,7 @@ public class JoinDataSource implements DataSource
{
final List<PreJoinableClause> clauses = analysis.getPreJoinableClauses();
if (clauses.isEmpty()) {
- throw new IAE("No join clauses to build the cache key for data source
[%s]", analysis.getDataSource());
+ throw new IAE("No join clauses to build the cache key for data source
[%s]", this);
}
final CacheKeyBuilder keyBuilder;
@@ -479,4 +481,52 @@ public class JoinDataSource implements DataSource
}
return keyBuilder.build();
}
+
+ private DataSourceAnalysis getAnalysisForDataSource()
+ {
+ final Triple<DataSource, DimFilter, List<PreJoinableClause>> flattened =
flattenJoin(this);
+ return new DataSourceAnalysis(flattened.first, null, flattened.second,
flattened.third);
+ }
+
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return analysis;
+ }
+
+ /**
+ * Flatten a datasource into two parts: the left-hand side datasource (the
'base' datasource), and a list of join
+ * clauses, if any.
+ *
+ * @throws IllegalArgumentException if dataSource cannot be fully flattened.
+ */
+ private static Triple<DataSource, DimFilter, List<PreJoinableClause>>
flattenJoin(final JoinDataSource dataSource)
+ {
+ DataSource current = dataSource;
+ DimFilter currentDimFilter = null;
+ final List<PreJoinableClause> preJoinableClauses = new ArrayList<>();
+
+ while (current instanceof JoinDataSource) {
+ final JoinDataSource joinDataSource = (JoinDataSource) current;
+ current = joinDataSource.getLeft();
+ if (currentDimFilter != null) {
+ throw new IAE("Left filters are only allowed when left child is direct
table access");
+ }
+ currentDimFilter = joinDataSource.getLeftFilter();
+ preJoinableClauses.add(
+ new PreJoinableClause(
+ joinDataSource.getRightPrefix(),
+ joinDataSource.getRight(),
+ joinDataSource.getJoinType(),
+ joinDataSource.getConditionAnalysis()
+ )
+ );
+ }
+
+ // Join clauses were added in the order we saw them while traversing down,
but we need to apply them in the
+ // going-up order. So reverse them.
+ Collections.reverse(preJoinableClauses);
+
+ return Triple.of(current, currentDimFilter, preJoinableClauses);
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
index 0b40aea020..873ca88932 100644
--- a/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/LookupDataSource.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@@ -120,6 +121,12 @@ public class LookupDataSource implements DataSource
return null;
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java
b/processing/src/main/java/org/apache/druid/query/Queries.java
index f1a46f3b49..c08f63938b 100644
--- a/processing/src/main/java/org/apache/druid/query/Queries.java
+++ b/processing/src/main/java/org/apache/druid/query/Queries.java
@@ -166,7 +166,8 @@ public class Queries
}
// Verify preconditions and invariants, just in case.
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(retVal.getDataSource());
+ final DataSource retDataSource = retVal.getDataSource();
+ final DataSourceAnalysis analysis = retDataSource.getAnalysis();
// Sanity check: query must be based on a single table.
if (!analysis.getBaseTableDataSource().isPresent()) {
diff --git
a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
index c495d0832f..f21e3bf1ac 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryDataSource.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@@ -112,6 +113,19 @@ public class QueryDataSource implements DataSource
return null;
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ final Query<?> subQuery = this.getQuery();
+ if (!(subQuery instanceof BaseQuery)) {
+ // We must verify that the subQuery is a BaseQuery, because it is
required to make
+ // "DataSourceAnalysis.getBaseQuerySegmentSpec" work properly.
+ // All built-in query types are BaseQuery, so we only expect this with
funky extension queries.
+ throw new IAE("Cannot analyze subquery of class[%s]",
subQuery.getClass().getName());
+ }
+ final DataSource current = subQuery.getDataSource();
+ return current.getAnalysis().maybeWithBaseQuery(subQuery);
+ }
@Override
public String toString()
diff --git
a/processing/src/main/java/org/apache/druid/query/TableDataSource.java
b/processing/src/main/java/org/apache/druid/query/TableDataSource.java
index 2fbc6ca15f..fe9cf46e37 100644
--- a/processing/src/main/java/org/apache/druid/query/TableDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/TableDataSource.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@@ -117,6 +118,12 @@ public class TableDataSource implements DataSource
return new byte[0];
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+ }
+
@Override
public String toString()
{
diff --git
a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
index 50802dc725..3f538f5ad5 100644
--- a/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/UnionDataSource.java
@@ -26,8 +26,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -129,6 +131,12 @@ public class UnionDataSource implements DataSource
return null;
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java
b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java
index f38ce28f28..07b2947c27 100644
--- a/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java
+++ b/processing/src/main/java/org/apache/druid/query/UnionQueryRunner.java
@@ -50,7 +50,7 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
{
Query<T> query = queryPlus.getQuery();
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
if (analysis.isConcreteTableBased() &&
analysis.getBaseUnionDataSource().isPresent()) {
// Union of tables.
diff --git
a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
index 4623701674..f548944948 100644
--- a/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/UnnestDataSource.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.UnnestSegmentReference;
import org.apache.druid.utils.JvmUtils;
@@ -187,6 +188,13 @@ public class UnnestDataSource implements DataSource
return null;
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ final DataSource current = this.getBase();
+ return current.getAnalysis();
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index 63c2c8b815..1a6cf49f91 100644
---
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -20,12 +20,10 @@
package org.apache.druid.query.planning;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.Triple;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
@@ -33,8 +31,6 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -59,8 +55,7 @@ import java.util.Optional;
* </pre>
*
* The base datasource (Db) is returned by {@link #getBaseDataSource()}. The
other leaf datasources are returned by
- * {@link #getPreJoinableClauses()}. The outer query datasources are available
as part of {@link #getDataSource()},
- * which just returns the original datasource that was provided for analysis.
+ * {@link #getPreJoinableClauses()}.
*
* The base datasource (Db) will never be a join, but it can be any other type
of datasource (table, query, etc).
* Note that join trees are only flattened if they occur at the top of the
overall tree (or underneath an outer query),
@@ -78,7 +73,6 @@ import java.util.Optional;
*/
public class DataSourceAnalysis
{
- private final DataSource dataSource;
private final DataSource baseDataSource;
@Nullable
private final Query<?> baseQuery;
@@ -86,8 +80,7 @@ public class DataSourceAnalysis
private final DimFilter joinBaseTableFilter;
private final List<PreJoinableClause> preJoinableClauses;
- private DataSourceAnalysis(
- DataSource dataSource,
+ public DataSourceAnalysis(
DataSource baseDataSource,
@Nullable Query<?> baseQuery,
@Nullable DimFilter joinBaseTableFilter,
@@ -97,99 +90,15 @@ public class DataSourceAnalysis
if (baseDataSource instanceof JoinDataSource) {
// The base cannot be a join (this is a class invariant).
// If it happens, it's a bug in the datasource analyzer.
- throw new IAE("Base dataSource cannot be a join! Original dataSource
was: %s", dataSource);
+ throw new IAE("Base dataSource cannot be a join! Original base
datasource was: %s", baseDataSource);
}
- this.dataSource = dataSource;
this.baseDataSource = baseDataSource;
this.baseQuery = baseQuery;
this.joinBaseTableFilter = joinBaseTableFilter;
this.preJoinableClauses = preJoinableClauses;
}
- public static DataSourceAnalysis forDataSource(final DataSource dataSource)
- {
- // Strip outer queries, retaining querySegmentSpecs as we go down (lowest
will become the 'baseQuerySegmentSpec').
- Query<?> baseQuery = null;
- DataSource current = dataSource;
-
- // This needs to be an or condition between QueryDataSource and
UnnestDataSource
- // As queries can have interleaving query and unnest data sources.
- // Ideally if each data source generate their own analysis object we can
avoid the or here
- // and have cleaner code. Especially as we increase the types of data
sources in future
- // these or checks will be tedious. Future development should move
forDataSource method
- // into each data source.
-
- while (current instanceof QueryDataSource || current instanceof
UnnestDataSource) {
- if (current instanceof QueryDataSource) {
- final Query<?> subQuery = ((QueryDataSource) current).getQuery();
-
- if (!(subQuery instanceof BaseQuery)) {
- // We must verify that the subQuery is a BaseQuery, because it is
required to make "getBaseQuerySegmentSpec"
- // work properly. All built-in query types are BaseQuery, so we only
expect this with funky extension queries.
- throw new IAE("Cannot analyze subquery of class[%s]",
subQuery.getClass().getName());
- }
-
- baseQuery = subQuery;
- current = subQuery.getDataSource();
- } else {
- final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
- current = unnestDataSource.getBase();
- }
- }
-
- if (current instanceof JoinDataSource) {
- final Triple<DataSource, DimFilter, List<PreJoinableClause>> flattened =
flattenJoin((JoinDataSource) current);
- return new DataSourceAnalysis(dataSource, flattened.first, baseQuery,
flattened.second, flattened.third);
- } else {
- return new DataSourceAnalysis(dataSource, current, baseQuery, null,
Collections.emptyList());
- }
- }
-
- /**
- * Flatten a datasource into two parts: the left-hand side datasource (the
'base' datasource), and a list of join
- * clauses, if any.
- *
- * @throws IllegalArgumentException if dataSource cannot be fully flattened.
- */
- private static Triple<DataSource, DimFilter, List<PreJoinableClause>>
flattenJoin(final JoinDataSource dataSource)
- {
- DataSource current = dataSource;
- DimFilter currentDimFilter = null;
- final List<PreJoinableClause> preJoinableClauses = new ArrayList<>();
-
- while (current instanceof JoinDataSource) {
- final JoinDataSource joinDataSource = (JoinDataSource) current;
- current = joinDataSource.getLeft();
- if (currentDimFilter != null) {
- throw new IAE("Left filters are only allowed when left child is direct
table access");
- }
- currentDimFilter = joinDataSource.getLeftFilter();
- preJoinableClauses.add(
- new PreJoinableClause(
- joinDataSource.getRightPrefix(),
- joinDataSource.getRight(),
- joinDataSource.getJoinType(),
- joinDataSource.getConditionAnalysis()
- )
- );
- }
-
- // Join clauses were added in the order we saw them while traversing down,
but we need to apply them in the
- // going-up order. So reverse them.
- Collections.reverse(preJoinableClauses);
-
- return Triple.of(current, currentDimFilter, preJoinableClauses);
- }
-
- /**
- * Returns the topmost datasource: the original one passed to {@link
#forDataSource(DataSource)}.
- */
- public DataSource getDataSource()
- {
- return dataSource;
- }
-
/**
* Returns the base (bottom-leftmost) datasource.
*/
@@ -230,7 +139,7 @@ public class DataSourceAnalysis
* the datasource tree. This is the query that will be applied to the base
datasource plus any joinables that might
* be present.
*
- * @return the query associated with the base datasource if {@link
#isQuery()} is true, else empty
+ * @return the query associated with the base datasource if is true, else
empty
*/
public Optional<Query<?>> getBaseQuery()
{
@@ -253,7 +162,7 @@ public class DataSourceAnalysis
* <p>
* This {@link QuerySegmentSpec} is taken from the query returned by {@link
#getBaseQuery()}.
*
- * @return the query segment spec associated with the base datasource if
{@link #isQuery()} is true, else empty
+ * @return the query segment spec associated with the base datasource if is
true, else empty
*/
public Optional<QuerySegmentSpec> getBaseQuerySegmentSpec()
{
@@ -261,20 +170,27 @@ public class DataSourceAnalysis
}
/**
- * Returns join clauses corresponding to joinable leaf datasources (every
leaf except the bottom-leftmost).
+ * Returns the data source analysis with or without the updated query.
+ * If the DataSourceAnalysis already has a non-null baseQuery, no update is
required
+ * Else this method creates a new analysis object with the base query
provided in the input
+ *
+ * @param query the query to add to the analysis if the baseQuery is null
+ * @return the existing analysis if it has non-null basequery, else a new
one with the updated base query
*/
- public List<PreJoinableClause> getPreJoinableClauses()
+ public DataSourceAnalysis maybeWithBaseQuery(Query<?> query)
{
- return preJoinableClauses;
+ if (!getBaseQuery().isPresent()) {
+ return new DataSourceAnalysis(baseDataSource, query,
joinBaseTableFilter, preJoinableClauses);
+ }
+ return this;
}
/**
- * Returns true if all servers have the ability to compute this datasource.
These datasources depend only on
- * globally broadcast data, like lookups or inline data or broadcast
segments.
+ * Returns join clauses corresponding to joinable leaf datasources (every
leaf except the bottom-leftmost).
*/
- public boolean isGlobal()
+ public List<PreJoinableClause> getPreJoinableClauses()
{
- return dataSource.isGlobal();
+ return preJoinableClauses;
}
/**
@@ -309,15 +225,6 @@ public class DataSourceAnalysis
.allMatch(ds -> ds
instanceof TableDataSource)));
}
- /**
- * Returns true if this datasource represents a subquery (that is, whether
it is a {@link QueryDataSource}).
- */
- public boolean isQuery()
- {
- return dataSource instanceof QueryDataSource;
- }
-
-
/**
* Returns true if this datasource is made out of a join operation
*/
@@ -336,20 +243,19 @@ public class DataSourceAnalysis
return false;
}
DataSourceAnalysis that = (DataSourceAnalysis) o;
- return Objects.equals(dataSource, that.dataSource);
+ return Objects.equals(baseDataSource, that.baseDataSource);
}
@Override
public int hashCode()
{
- return Objects.hash(dataSource);
+ return Objects.hash(baseDataSource);
}
@Override
public String toString()
{
return "DataSourceAnalysis{" +
- "dataSource=" + dataSource +
", baseDataSource=" + baseDataSource +
", baseQuery=" + baseQuery +
", preJoinableClauses=" + preJoinableClauses +
diff --git
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
index 4ea4397935..88e1335c19 100644
---
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
+++
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
@@ -62,13 +62,10 @@ public class DataSourceAnalysisTest
@Test
public void testTable()
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(TABLE_FOO);
+ final DataSourceAnalysis analysis = TABLE_FOO.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(TABLE_FOO, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -82,13 +79,10 @@ public class DataSourceAnalysisTest
public void testUnion()
{
final UnionDataSource unionDataSource = new
UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR));
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(unionDataSource);
+ final DataSourceAnalysis analysis = unionDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(unionDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource),
analysis.getBaseUnionDataSource());
@@ -102,13 +96,10 @@ public class DataSourceAnalysisTest
public void testQueryOnTable()
{
final QueryDataSource queryDataSource = subquery(TABLE_FOO);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(queryDataSource);
+ final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertTrue(analysis.isQuery());
- Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -126,13 +117,10 @@ public class DataSourceAnalysisTest
{
final UnionDataSource unionDataSource = new
UnionDataSource(ImmutableList.of(TABLE_FOO, TABLE_BAR));
final QueryDataSource queryDataSource = subquery(unionDataSource);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(queryDataSource);
+ final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertTrue(analysis.isQuery());
- Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource),
analysis.getBaseUnionDataSource());
@@ -148,13 +136,10 @@ public class DataSourceAnalysisTest
@Test
public void testLookup()
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(LOOKUP_LOOKYLOO);
+ final DataSourceAnalysis analysis = LOOKUP_LOOKYLOO.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
- Assert.assertTrue(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -168,13 +153,10 @@ public class DataSourceAnalysisTest
public void testQueryOnLookup()
{
final QueryDataSource queryDataSource = subquery(LOOKUP_LOOKYLOO);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(queryDataSource);
+ final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
- Assert.assertTrue(analysis.isGlobal());
- Assert.assertTrue(analysis.isQuery());
- Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -190,13 +172,10 @@ public class DataSourceAnalysisTest
@Test
public void testInline()
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(INLINE);
+ final DataSourceAnalysis analysis = INLINE.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
- Assert.assertTrue(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(INLINE, analysis.getDataSource());
Assert.assertEquals(INLINE, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -230,13 +209,10 @@ public class DataSourceAnalysisTest
JoinType.FULL
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
@@ -277,13 +253,10 @@ public class DataSourceAnalysisTest
JoinType.FULL
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
@@ -331,13 +304,10 @@ public class DataSourceAnalysisTest
JoinType.RIGHT
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
@@ -378,13 +348,10 @@ public class DataSourceAnalysisTest
TrueDimFilter.instance()
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
@@ -411,13 +378,10 @@ public class DataSourceAnalysisTest
TrueDimFilter.instance()
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
@@ -442,13 +406,10 @@ public class DataSourceAnalysisTest
JoinType.INNER
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
Assert.assertEquals(Optional.of(unionDataSource),
analysis.getBaseUnionDataSource());
@@ -480,13 +441,10 @@ public class DataSourceAnalysisTest
)
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(queryDataSource);
+ final DataSourceAnalysis analysis = queryDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertTrue(analysis.isQuery());
- Assert.assertEquals(queryDataSource, analysis.getDataSource());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
@@ -528,13 +486,10 @@ public class DataSourceAnalysisTest
JoinType.INNER
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
- Assert.assertTrue(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -560,13 +515,10 @@ public class DataSourceAnalysisTest
JoinType.INNER
);
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(joinDataSource);
+ final DataSourceAnalysis analysis = joinDataSource.getAnalysis();
Assert.assertFalse(analysis.isConcreteBased());
Assert.assertFalse(analysis.isConcreteTableBased());
- Assert.assertFalse(analysis.isGlobal());
- Assert.assertFalse(analysis.isQuery());
- Assert.assertEquals(joinDataSource, analysis.getDataSource());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -587,10 +539,10 @@ public class DataSourceAnalysisTest
{
EqualsVerifier.forClass(DataSourceAnalysis.class)
.usingGetClass()
- .withNonnullFields("dataSource")
+ .withNonnullFields("baseDataSource")
// These fields are not necessary, because they're wholly
determined by "dataSource"
- .withIgnoredFields("baseDataSource", "baseQuery",
"preJoinableClauses", "joinBaseTableFilter")
+ .withIgnoredFields("baseQuery", "preJoinableClauses",
"joinBaseTableFilter")
.verify();
}
diff --git
a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
index b6eb168b62..c211137c85 100644
--- a/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
+++ b/processing/src/test/java/org/apache/druid/segment/join/NoopDataSource.java
@@ -21,8 +21,10 @@ package org.apache.druid.segment.join;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -89,4 +91,10 @@ public class NoopDataSource implements DataSource
{
return new byte[]{};
}
+
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+ }
}
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 10a9c12c61..ce4aed72b4 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -353,7 +353,7 @@ public class BrokerServerView implements TimelineServerView
{
final TableDataSource table =
analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getDataSource()));
+ .orElseThrow(() -> new ISE("Cannot handle base datasource:
%s", analysis.getBaseDataSource()));
synchronized (lock) {
return Optional.ofNullable(timelines.get(table.getName()));
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 52b36a0276..5c0e5efb7e 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -279,7 +279,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
this.query = queryPlus.getQuery();
this.toolChest = warehouse.getToolChest(query);
this.strategy = toolChest.getCacheStrategy(query);
- this.dataSourceAnalysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ this.dataSourceAnalysis = query.getDataSource().getAnalysis();
this.useCache = CacheUtil.isUseSegmentCache(query, strategy,
cacheConfig, CacheUtil.ServerType.BROKER);
this.populateCache = CacheUtil.isPopulateSegmentCache(query, strategy,
cacheConfig, CacheUtil.ServerType.BROKER);
diff --git a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
index b9f1f91a5d..72ad84b4e6 100644
--- a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
+++ b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
@@ -57,7 +57,7 @@ public class ServerViewUtil
int numCandidates
)
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(datasource);
+ final DataSourceAnalysis analysis = datasource.getAnalysis();
final Optional<? extends TimelineLookup<String, ServerSelector>>
maybeTimeline = serverView.getTimeline(analysis);
if (!maybeTimeline.isPresent()) {
return Collections.emptyList();
diff --git
a/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
b/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
index b645a44d44..39794eb7b7 100644
---
a/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
+++
b/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
@@ -74,7 +74,7 @@ public class BroadcastTableJoinableFactory implements
JoinableFactory
private Optional<ReferenceCountingIndexedTable>
getOnlyIndexedTable(DataSource dataSource)
{
GlobalTableDataSource broadcastDataSource = (GlobalTableDataSource)
dataSource;
- DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(dataSource);
+ DataSourceAnalysis analysis = dataSource.getAnalysis();
return segmentManager.getIndexedTables(analysis).flatMap(tables -> {
Iterator<ReferenceCountingIndexedTable> tableIterator =
tables.iterator();
if (!tableIterator.hasNext()) {
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 389dfc4848..46315dbc0d 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -36,6 +36,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
@@ -146,11 +147,12 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query,
final Iterable<SegmentDescriptor> specs)
{
// We only handle one particular dataSource. Make sure that's what we
have, then ignore from here on out.
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSource dataSourceFromQuery = query.getDataSource();
+ final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
// Sanity check: make sure the query is based on the table we're meant to
handle.
if (!analysis.getBaseTableDataSource().filter(ds ->
dataSource.equals(ds.getName())).isPresent()) {
- throw new ISE("Cannot handle datasource: %s", analysis.getDataSource());
+ throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
final QueryRunnerFactory<T, Query<T>> factory =
conglomerate.findFactory(query);
@@ -163,12 +165,12 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
// Make sure this query type can handle the subquery, if present.
- if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource)
analysis.getDataSource()).getQuery())) {
- throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
+ if ((dataSourceFromQuery instanceof QueryDataSource) &&
!toolChest.canPerformSubquery(((QueryDataSource)
dataSourceFromQuery).getQuery())) {
+ throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
// segmentMapFn maps each base Segment into a joined Segment if necessary.
- final Function<SegmentReference, SegmentReference> segmentMapFn =
analysis.getDataSource()
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
dataSourceFromQuery
.createSegmentMapFunction(
query,
cpuTimeAccumulator
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 0390e32b22..9e4054151b 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -360,11 +360,11 @@ public class UnifiedIndexerAppenderatorsManager
implements AppenderatorsManager
@VisibleForTesting
<T> DatasourceBundle getBundle(final Query<T> query)
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final TableDataSource table =
analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getDataSource()));
+ .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
query.getDataSource()));
final DatasourceBundle bundle;
diff --git
a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
index 08032a5efe..e6632edfe5 100644
--- a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
+++ b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
@@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.LocatedSegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizationUtils;
@@ -156,7 +155,7 @@ public class ClientInfoResource
}
final Optional<? extends TimelineLookup<String, ServerSelector>>
maybeTimeline =
- timelineServerView.getTimeline(DataSourceAnalysis.forDataSource(new
TableDataSource(dataSourceName)));
+ timelineServerView.getTimeline((new
TableDataSource(dataSourceName)).getAnalysis());
final Optional<Iterable<TimelineObjectHolder<String, ServerSelector>>>
maybeServersLookup =
maybeTimeline.map(timeline -> timeline.lookup(theInterval));
if (!maybeServersLookup.isPresent() ||
Iterables.isEmpty(maybeServersLookup.get())) {
diff --git
a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
index 638e229192..61f7817e2f 100644
--- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java
@@ -239,16 +239,17 @@ public class ClientQuerySegmentWalker implements
QuerySegmentWalker
*/
private <T> boolean canRunQueryUsingLocalWalker(Query<T> query)
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSource dataSourceFromQuery = query.getDataSource();
+ final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
final QueryToolChest<T, Query<T>> toolChest =
warehouse.getToolChest(query);
// 1) Must be based on a concrete datasource that is not a table.
// 2) Must be based on globally available data (so we have a copy here on
the Broker).
// 3) If there is an outer query, it must be handleable by the query
toolchest (the local walker does not handle
// subqueries on its own).
- return analysis.isConcreteBased() && !analysis.isConcreteTableBased() &&
analysis.isGlobal()
- && (!analysis.isQuery()
- || toolChest.canPerformSubquery(((QueryDataSource)
analysis.getDataSource()).getQuery()));
+ return analysis.isConcreteBased() && !analysis.isConcreteTableBased() &&
dataSourceFromQuery.isGlobal()
+ && (!(dataSourceFromQuery instanceof QueryDataSource)
+ || toolChest.canPerformSubquery(((QueryDataSource)
dataSourceFromQuery).getQuery()));
}
/**
@@ -257,15 +258,16 @@ public class ClientQuerySegmentWalker implements
QuerySegmentWalker
*/
private <T> boolean canRunQueryUsingClusterWalker(Query<T> query)
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSource dataSourceFromQuery = query.getDataSource();
+ final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
final QueryToolChest<T, Query<T>> toolChest =
warehouse.getToolChest(query);
// 1) Must be based on a concrete table (the only shape the Druid cluster
can handle).
// 2) If there is an outer query, it must be handleable by the query
toolchest (the cluster walker does not handle
// subqueries on its own).
return analysis.isConcreteTableBased()
- && (!analysis.isQuery()
- || toolChest.canPerformSubquery(((QueryDataSource)
analysis.getDataSource()).getQuery()));
+ && (!(dataSourceFromQuery instanceof QueryDataSource)
+ || toolChest.canPerformSubquery(((QueryDataSource)
dataSourceFromQuery).getQuery()));
}
diff --git
a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
index 767e5fbd40..d82eb6e785 100644
--- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
@@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
@@ -47,7 +48,7 @@ import java.util.stream.StreamSupport;
* Processor that computes Druid queries, single-threaded.
*
* The datasource for the query must satisfy {@link
DataSourceAnalysis#isConcreteBased()} and
- * {@link DataSourceAnalysis#isGlobal()}. Its base datasource must also be
handleable by the provided
+ * {@link DataSource#isGlobal()}. Its base datasource must also be handleable
by the provided
* {@link SegmentWrangler}.
*
* Mainly designed to be used by {@link ClientQuerySegmentWalker}.
@@ -79,10 +80,11 @@ public class LocalQuerySegmentWalker implements
QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query,
final Iterable<Interval> intervals)
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSource dataSourceFromQuery = query.getDataSource();
+ final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
- if (!analysis.isConcreteBased() || !analysis.isGlobal()) {
- throw new IAE("Cannot query dataSource locally: %s",
analysis.getDataSource());
+ if (!analysis.isConcreteBased() || !dataSourceFromQuery.isGlobal()) {
+ throw new IAE("Cannot query dataSource locally: %s",
dataSourceFromQuery);
}
// wrap in ReferenceCountingSegment, these aren't currently managed by
SegmentManager so reference tracking doesn't
@@ -93,10 +95,11 @@ public class LocalQuerySegmentWalker implements
QuerySegmentWalker
final AtomicLong cpuAccumulator = new AtomicLong(0L);
- final Function<SegmentReference, SegmentReference> segmentMapFn =
- analysis
- .getDataSource()
- .createSegmentMapFunction(query, cpuAccumulator);
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
dataSourceFromQuery
+ .createSegmentMapFunction(
+ query,
+ cpuAccumulator
+ );
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory =
conglomerate.findFactory(query);
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index ff509272a8..6ce441b2ab 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -238,7 +238,7 @@ public class SegmentManager
private TableDataSource getTableDataSource(DataSourceAnalysis analysis)
{
return analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getDataSource()));
+ .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getBaseDataSource()));
}
public boolean loadSegment(final DataSegment segment, boolean lazy,
SegmentLazyLoadFailCallback loadFailed)
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index 7990a44b85..fa670e0439 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -35,6 +35,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
@@ -123,7 +124,7 @@ public class ServerManager implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query,
Iterable<Interval> intervals)
{
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
final Optional<VersionedIntervalTimeline<String,
ReferenceCountingSegment>> maybeTimeline =
segmentManager.getTimeline(analysis);
@@ -165,19 +166,20 @@ public class ServerManager implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query,
Iterable<SegmentDescriptor> specs)
{
+ final DataSource dataSourceFromQuery = query.getDataSource();
final QueryRunnerFactory<T, Query<T>> factory =
conglomerate.findFactory(query);
if (factory == null) {
final QueryUnsupportedException e = new QueryUnsupportedException(
StringUtils.format("Unknown query type, [%s]", query.getClass())
);
log.makeAlert(e, "Error while executing a query[%s]", query.getId())
- .addData("dataSource", query.getDataSource())
+ .addData("dataSource", dataSourceFromQuery)
.emit();
throw e;
}
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
final AtomicLong cpuTimeAccumulator = new AtomicLong(0L);
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
@@ -185,8 +187,9 @@ public class ServerManager implements QuerySegmentWalker
segmentManager.getTimeline(analysis);
// Make sure this query type can handle the subquery, if present.
- if (analysis.isQuery() && !toolChest.canPerformSubquery(((QueryDataSource)
analysis.getDataSource()).getQuery())) {
- throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
+ if ((dataSourceFromQuery instanceof QueryDataSource)
+ && !toolChest.canPerformSubquery(((QueryDataSource)
dataSourceFromQuery).getQuery())) {
+ throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
if (maybeTimeline.isPresent()) {
@@ -195,11 +198,11 @@ public class ServerManager implements QuerySegmentWalker
return new
ReportTimelineMissingSegmentQueryRunner<>(Lists.newArrayList(specs));
}
final Function<SegmentReference, SegmentReference> segmentMapFn =
- query.getDataSource()
+ dataSourceFromQuery
.createSegmentMapFunction(query, cpuTimeAccumulator);
// We compute the datasource's cache key here itself so it doesn't need to
be re-computed for every segment
- final Optional<byte[]> cacheKeyPrefix =
Optional.ofNullable(query.getDataSource().getCacheKey());
+ final Optional<byte[]> cacheKeyPrefix =
Optional.ofNullable(dataSourceFromQuery.getCacheKey());
final FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(specs)
diff --git
a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index bbd9a5844f..8af8e7372e 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -40,7 +40,6 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
@@ -110,7 +109,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource("test_broker_server_view"))
+ (new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
List<TimelineObjectHolder<String, ServerSelector>> serverLookupRes =
timeline.lookup(intervals);
Assert.assertEquals(1, serverLookupRes.size());
@@ -172,7 +171,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource("test_broker_server_view"))
+ (new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
assertValues(
Arrays.asList(
@@ -195,7 +194,7 @@ public class BrokerServerViewTest extends CuratorTestBase
segmentRemovedLatch = new CountDownLatch(4);
timeline = brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource("test_broker_server_view"))
+ (new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
assertValues(
Arrays.asList(
@@ -274,7 +273,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource("test_broker_server_view"))
+ (new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
assertValues(
@@ -298,7 +297,7 @@ public class BrokerServerViewTest extends CuratorTestBase
segmentRemovedLatch = new CountDownLatch(5);
timeline = brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource("test_broker_server_view"))
+ (new TableDataSource("test_broker_server_view")).getAnalysis()
).get();
// expect same set of segments as before
@@ -354,7 +353,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource(segment1.getDataSource()))
+ (new TableDataSource(segment1.getDataSource())).getAnalysis()
).get();
// Verify that the timeline has no entry for the interval of segment 1
@@ -414,7 +413,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource(segment1.getDataSource()))
+ (new TableDataSource(segment1.getDataSource())).getAnalysis()
).get();
// Verify that the timeline has no entry for the interval of segment 1
@@ -476,7 +475,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- DataSourceAnalysis.forDataSource(new
TableDataSource(segment1.getDataSource()))
+ (new TableDataSource(segment1.getDataSource())).getAnalysis()
).get();
// Verify that the timeline has no entry for the interval of segment 1
diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java
b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
index 68add5fb73..420b7b965d 100644
--- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java
+++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
@@ -140,7 +140,7 @@ public class SimpleServerView implements TimelineServerView
{
final TableDataSource table =
analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getDataSource()));
+ .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getBaseDataSource()));
return Optional.ofNullable(timelines.get(table.getName()));
}
diff --git
a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 9d1ac4c4e2..620dbd26eb 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
@@ -386,7 +385,7 @@ public class SegmentManagerTest
{
Assert.assertEquals(
Optional.empty(),
- segmentManager.getTimeline(DataSourceAnalysis.forDataSource(new
TableDataSource("nonExisting")))
+ segmentManager.getTimeline((new
TableDataSource("nonExisting")).getAnalysis())
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 000626f16e..47a3ddc46d 100644
---
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Queries;
@@ -92,7 +93,7 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
// Strange, but true. Required to get authentic behavior with
UnionDataSources. (Although, it would be great if
// this wasn't required.)
return (queryPlus, responseContext) -> {
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(queryPlus.getQuery().getDataSource());
+ final DataSourceAnalysis analysis =
queryPlus.getQuery().getDataSource().getAnalysis();
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s",
queryPlus.getQuery().getDataSource());
@@ -112,15 +113,16 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query,
final Iterable<SegmentDescriptor> specs)
{
+ final DataSource dataSourceFromQuery = query.getDataSource();
final QueryRunnerFactory<T, Query<T>> factory =
conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
- final DataSourceAnalysis analysis =
DataSourceAnalysis.forDataSource(query.getDataSource());
+ final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
if (!analysis.isConcreteTableBased()) {
- throw new ISE("Cannot handle datasource: %s", query.getDataSource());
+ throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
final String dataSourceName = ((TableDataSource)
analysis.getBaseDataSource()).getName();
@@ -128,12 +130,12 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
// Make sure this query type can handle the subquery, if present.
- if (analysis.isQuery()
- && !toolChest.canPerformSubquery(((QueryDataSource)
analysis.getDataSource()).getQuery())) {
- throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
+ if ((dataSourceFromQuery instanceof QueryDataSource)
+ && !toolChest.canPerformSubquery(((QueryDataSource)
dataSourceFromQuery).getQuery())) {
+ throw new ISE("Cannot handle subquery: %s", dataSourceFromQuery);
}
- final Function<SegmentReference, SegmentReference> segmentMapFn =
analysis.getDataSource().createSegmentMapFunction(
+ final Function<SegmentReference, SegmentReference> segmentMapFn =
dataSourceFromQuery.createSegmentMapFunction(
query,
new AtomicLong()
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index b55e22feb8..433759ba82 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -29,8 +29,10 @@ import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.client.cache.LocalCacheProvider;
import org.apache.druid.collections.bitmap.BitmapFactory;
+import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.Pair;
@@ -50,6 +52,7 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
@@ -66,7 +69,6 @@ import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
@@ -140,7 +142,7 @@ public class ServerManagerTest
public void setUp()
{
EmittingLogger.registerEmitter(new NoopServiceEmitter());
-
+ NullHandling.initializeForTests();
queryWaitLatch = new CountDownLatch(1);
queryWaitYieldLatch = new CountDownLatch(1);
queryNotifyLatch = new CountDownLatch(1);
@@ -469,6 +471,25 @@ public class ServerManagerTest
Assert.assertSame(NoopQueryRunner.class, queryRunner.getClass());
}
+ @Test(expected = ISE.class)
+ public void
testGetQueryRunnerForSegmentsWhenTimelineIsMissingReportingMissingSegmentsOnQueryDataSource()
+ {
+ final Interval interval = Intervals.of("0000-01-01/P1D");
+ final SearchQuery query =
searchQueryWithQueryDataSource("unknown_datasource", interval,
Granularities.ALL);
+ final List<SegmentDescriptor> unknownSegments = Collections.singletonList(
+ new SegmentDescriptor(interval, "unknown_version", 0)
+ );
+ final QueryRunner<Result<SearchResultValue>> queryRunner =
serverManager.getQueryRunnerForSegments(
+ query,
+ unknownSegments
+ );
+ final ResponseContext responseContext =
DefaultResponseContext.createEmpty();
+ final List<Result<SearchResultValue>> results =
queryRunner.run(QueryPlus.wrap(query), responseContext).toList();
+ Assert.assertTrue(results.isEmpty());
+ Assert.assertNotNull(responseContext.getMissingSegments());
+ Assert.assertEquals(unknownSegments, responseContext.getMissingSegments());
+ }
+
@Test
public void
testGetQueryRunnerForSegmentsWhenTimelineIsMissingReportingMissingSegments()
{
@@ -533,7 +554,7 @@ public class ServerManagerTest
final Interval interval = Intervals.of("P1d/2011-04-01");
final SearchQuery query = searchQuery("test", interval, Granularities.ALL);
final Optional<VersionedIntervalTimeline<String,
ReferenceCountingSegment>> maybeTimeline = segmentManager
- .getTimeline(DataSourceAnalysis.forDataSource(query.getDataSource()));
+ .getTimeline(query.getDataSource().getAnalysis());
Assert.assertTrue(maybeTimeline.isPresent());
final List<TimelineObjectHolder<String, ReferenceCountingSegment>> holders
= maybeTimeline.get().lookup(interval);
final List<SegmentDescriptor> closedSegments = new ArrayList<>();
@@ -638,6 +659,30 @@ public class ServerManagerTest
.build();
}
+
+ private SearchQuery searchQueryWithQueryDataSource(String datasource,
Interval interval, Granularity granularity)
+ {
+ final ImmutableList<SegmentDescriptor> descriptors = ImmutableList.of(
+ new SegmentDescriptor(Intervals.of("2000/3000"), "0", 0),
+ new SegmentDescriptor(Intervals.of("2000/3000"), "0", 1)
+ );
+ return Druids.newSearchQueryBuilder()
+ .dataSource(
+ new QueryDataSource(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(datasource)
+ .intervals(new
MultipleSpecificSegmentSpec(descriptors))
+ .granularity(Granularities.ALL)
+ .build()
+ )
+ )
+ .intervals(Collections.singletonList(interval))
+ .granularity(granularity)
+ .limit(10000)
+ .query("wow")
+ .build();
+ }
+
private Future assertQueryable(
Granularity granularity,
String dataSource,
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
index bd2c8cb738..e74a0eaefb 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalDataSource.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
+import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.column.RowSignature;
@@ -144,6 +145,12 @@ public class ExternalDataSource implements DataSource
return null;
}
+ @Override
+ public DataSourceAnalysis getAnalysis()
+ {
+ return new DataSourceAnalysis(this, null, null, Collections.emptyList());
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index f4601c03b3..4c7340a14d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -65,7 +65,6 @@ import
org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.ordering.StringComparator;
import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.query.timeseries.TimeseriesQuery;
@@ -842,7 +841,7 @@ public class DruidQuery
return true;
}
- if (DataSourceAnalysis.forDataSource(dataSource).isConcreteTableBased()) {
+ if (dataSource.getAnalysis().isConcreteTableBased()) {
// Always OK: queries on concrete tables (regular Druid datasources) use
segment-based storage adapters
// (IncrementalIndex or QueryableIndex). These clip query interval to
data interval, making wide query
// intervals safer. They do not have special checks for granularity and
interval safety.
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
index 258a744827..1bd1e58193 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
@@ -45,7 +45,6 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.OrDimFilter;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
@@ -167,10 +166,10 @@ public class NativeQueryMaker implements QueryMaker
private List<Interval> findBaseDataSourceIntervals(Query<?> query)
{
- return DataSourceAnalysis.forDataSource(query.getDataSource())
- .getBaseQuerySegmentSpec()
- .map(QuerySegmentSpec::getIntervals)
- .orElseGet(query::getIntervals);
+ return query.getDataSource().getAnalysis()
+ .getBaseQuerySegmentSpec()
+ .map(QuerySegmentSpec::getIntervals)
+ .orElseGet(query::getIntervals);
}
@SuppressWarnings("unchecked")
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java
index ab74255d69..0adc174756 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.java
@@ -47,7 +47,6 @@ import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -71,7 +70,6 @@ import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
-
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
@@ -218,7 +216,7 @@ public class SegmentDataCacheConcurrencyTest extends
SegmentMetadataCacheCommon
for (int i = 0; i < 1000; i++) {
boolean hasTimeline = exec.submit(
- () -> serverView.getTimeline(DataSourceAnalysis.forDataSource(new
TableDataSource(DATASOURCE)))
+ () -> serverView.getTimeline((new
TableDataSource(DATASOURCE)).getAnalysis())
.isPresent()
).get(100, TimeUnit.MILLISECONDS);
Assert.assertTrue(hasTimeline);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]