This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 11be3a953eb Reduce usage of DataSourceAnalysis in interfaces (#17724)
11be3a953eb is described below
commit 11be3a953eb218f1056c7500209368125390c324
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Feb 19 09:41:56 2025 +0100
Reduce usage of DataSourceAnalysis in interfaces (#17724)
---
.github/scripts/create-jacoco-coverage-report.sh | 5 ++
.../query/CachingClusteredClientBenchmark.java | 6 +-
.../materializedview/DataSourceOptimizer.java | 10 ++--
.../movingaverage/MovingAverageQueryTest.java | 4 +-
.../dart/controller/DartTableInputSpecSlicer.java | 2 +-
.../msq/dart/worker/DartDataSegmentProvider.java | 2 +-
.../controller/DartTableInputSpecSlicerTest.java | 4 +-
.../overlord/SingleTaskBackgroundRunner.java | 7 +--
.../main/java/org/apache/druid/query/Queries.java | 4 +-
.../apache/druid/query/RestrictedDataSource.java | 2 +-
.../druid/query/planning/DataSourceAnalysis.java | 23 ++++----
.../java/org/apache/druid/query/QueriesTest.java | 18 +++---
.../query/planning/DataSourceAnalysisTest.java | 67 ++++++++++++++++------
.../org/apache/druid/client/BrokerServerView.java | 9 +--
.../druid/client/CachingClusteredClient.java | 2 +-
.../org/apache/druid/client/ServerViewUtil.java | 2 +-
.../apache/druid/client/TimelineServerView.java | 11 ++--
.../join/BroadcastTableJoinableFactory.java | 4 +-
.../appenderator/SinkQuerySegmentWalker.java | 2 +-
.../UnifiedIndexerAppenderatorsManager.java | 8 +--
.../apache/druid/server/ClientInfoResource.java | 2 +-
.../org/apache/druid/server/SegmentManager.java | 27 ++-------
.../druid/server/coordination/ServerManager.java | 6 +-
.../apache/druid/client/BrokerServerViewTest.java | 16 +++---
.../CachingClusteredClientFunctionalityTest.java | 4 +-
.../druid/client/CachingClusteredClientTest.java | 4 +-
.../org/apache/druid/client/SimpleServerView.java | 7 +--
.../druid/server/ClientInfoResourceTest.java | 6 +-
.../apache/druid/server/SegmentManagerTest.java | 2 +-
.../server/TestClusterQuerySegmentWalker.java | 4 +-
.../server/coordination/ServerManagerTest.java | 2 +-
.../druid/server/http/MetadataResourceTest.java | 7 ++-
.../BrokerSegmentMetadataCacheConcurrencyTest.java | 2 +-
.../sql/calcite/util/TestTimelineServerView.java | 4 +-
34 files changed, 140 insertions(+), 145 deletions(-)
diff --git a/.github/scripts/create-jacoco-coverage-report.sh
b/.github/scripts/create-jacoco-coverage-report.sh
index 97f18bebeb6..b5af7765013 100755
--- a/.github/scripts/create-jacoco-coverage-report.sh
+++ b/.github/scripts/create-jacoco-coverage-report.sh
@@ -20,6 +20,11 @@ set -x
echo "GITHUB_BASE_REF: ${GITHUB_BASE_REF}"
+if [ "$GITHUB_BASE_REF" == "" ] ;then
+ echo "GITHUB_BASE_REF is not set; skipping this check!"
+ exit 0
+fi
+
echo "Setting up git remote"
git remote set-branches --add origin ${GITHUB_BASE_REF}
git fetch
diff --git
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index fdb072187d9..58361aba753 100644
---
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -68,6 +68,7 @@ import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@@ -81,7 +82,6 @@ import
org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
@@ -504,9 +504,9 @@ public class CachingClusteredClientBenchmark
}
@Override
- public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(DataSourceAnalysis analysis)
+ public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(TableDataSource table)
{
- return
Optional.ofNullable(timelines.get(analysis.getBaseTableDataSource().get().getName()));
+ return Optional.ofNullable(timelines.get(table.getName()));
}
@Override
diff --git
a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
index 38774b1ca02..823aecf6360 100644
---
a/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
+++
b/extensions-contrib/materialized-view-selection/src/main/java/org/apache/druid/query/materializedview/DataSourceOptimizer.java
@@ -84,7 +84,7 @@ public class DataSourceOptimizer
return Collections.singletonList(query);
}
String datasourceName = ((TableDataSource)
query.getDataSource()).getName();
- // get all derivatives for datasource in query. The derivatives set is
sorted by average size of
+ // get all derivatives for datasource in query. The derivatives set is
sorted by average size of
// per segment granularity.
Set<DerivativeDataSource> derivatives =
DerivativeDataSourceManager.getDerivatives(datasourceName);
@@ -118,13 +118,13 @@ public class DataSourceOptimizer
}
List<Query> queries = new ArrayList<>();
- List<Interval> remainingQueryIntervals = (List<Interval>)
query.getIntervals();
+ List<Interval> remainingQueryIntervals = query.getIntervals();
for (DerivativeDataSource derivativeDataSource :
ImmutableSortedSet.copyOf(derivativesWithRequiredFields)) {
TableDataSource tableDataSource = new
TableDataSource(derivativeDataSource.getName());
final List<Interval> derivativeIntervals =
remainingQueryIntervals.stream()
.flatMap(interval -> serverView
- .getTimeline(tableDataSource.getAnalysis())
+ .getTimeline(tableDataSource)
.orElseThrow(() -> new ISE("No timeline for dataSource: %s",
derivativeDataSource.getName()))
.lookup(interval)
.stream()
@@ -132,7 +132,7 @@ public class DataSourceOptimizer
)
.collect(Collectors.toList());
// if the derivative does not contain any parts of intervals in the
query, the derivative will
- // not be selected.
+ // not be selected.
if (derivativeIntervals.isEmpty()) {
continue;
}
@@ -154,7 +154,7 @@ public class DataSourceOptimizer
}
//after materialized view selection, the result of the remaining query
interval will be computed based on
- // the original datasource.
+ // the original datasource.
if (!remainingQueryIntervals.isEmpty()) {
queries.add(query.withQuerySegmentSpec(new
MultipleIntervalSegmentSpec(remainingQueryIntervals)));
}
diff --git
a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 1d645b2c7fd..dd5f79eecf5 100644
---
a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++
b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -61,10 +61,10 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.Result;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.movingaverage.test.TestConfig;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.MapJoinableFactory;
@@ -322,7 +322,7 @@ public class MovingAverageQueryTest extends
InitializedNullHandlingTest
new TimelineServerView()
{
@Override
- public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(DataSourceAnalysis analysis)
+ public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(TableDataSource analysis)
{
return Optional.empty();
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
index 13a91125298..f276caa435e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
@@ -102,7 +102,7 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
{
final TableInputSpec tableInputSpec = (TableInputSpec) inputSpec;
final TimelineLookup<String, ServerSelector> timeline =
- serverView.getTimeline(new
TableDataSource(tableInputSpec.getDataSource()).getAnalysis()).orElse(null);
+ serverView.getTimeline(new
TableDataSource(tableInputSpec.getDataSource())).orElse(null);
if (timeline == null) {
return Collections.emptyList();
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataSegmentProvider.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataSegmentProvider.java
index 0e8a38af90a..0566b6bfc26 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataSegmentProvider.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataSegmentProvider.java
@@ -65,7 +65,7 @@ public class DartDataSegmentProvider implements
DataSegmentProvider
return () -> {
final Optional<VersionedIntervalTimeline<String,
ReferenceCountingSegment>> timeline =
- segmentManager.getTimeline(new
TableDataSource(segmentId.getDataSource()).getAnalysis());
+ segmentManager.getTimeline(new
TableDataSource(segmentId.getDataSource()));
if (!timeline.isPresent()) {
throw segmentNotFound(segmentId);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
index 64b82f5ff57..92ce2bd2a1c 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
@@ -237,9 +237,9 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
}
Mockito.when(serverView.getDruidServerMetadatas()).thenReturn(SERVERS);
- Mockito.when(serverView.getTimeline(new
TableDataSource(DATASOURCE).getAnalysis()))
+ Mockito.when(serverView.getTimeline(new TableDataSource(DATASOURCE)))
.thenReturn(Optional.of(timeline));
- Mockito.when(serverView.getTimeline(new
TableDataSource(DATASOURCE_NONEXISTENT).getAnalysis()))
+ Mockito.when(serverView.getTimeline(new
TableDataSource(DATASOURCE_NONEXISTENT)))
.thenReturn(Optional.empty());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index 7f0e95ce08d..c9e5822aead 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -50,7 +50,7 @@ import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.planning.DataSourceAnalysis;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
import org.apache.druid.server.initialization.ServerConfig;
@@ -388,11 +388,10 @@ public class SingleTaskBackgroundRunner implements
TaskRunner, QuerySegmentWalke
QueryRunner<T> queryRunner = null;
if (runningItem != null) {
- final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
final Task task = runningItem.getTask();
+ final TableDataSource queryTable =
query.getDataSourceAnalysis().getBaseTableDataSource();
- if (analysis.getBaseTableDataSource().isPresent()
- &&
task.getDataSource().equals(analysis.getBaseTableDataSource().get().getName()))
{
+ if (task.getDataSource().equals(queryTable.getName())) {
final QueryRunner<T> taskQueryRunner = task.getQueryRunner(query);
if (taskQueryRunner != null) {
diff --git a/processing/src/main/java/org/apache/druid/query/Queries.java
b/processing/src/main/java/org/apache/druid/query/Queries.java
index e3262872d7a..006f9028aac 100644
--- a/processing/src/main/java/org/apache/druid/query/Queries.java
+++ b/processing/src/main/java/org/apache/druid/query/Queries.java
@@ -168,9 +168,7 @@ public class Queries
final DataSourceAnalysis analysis = retDataSource.getAnalysis();
// Sanity check: query must be based on a single table.
- if (!analysis.getBaseTableDataSource().isPresent()) {
- throw new ISE("Unable to apply specific segments to non-table-based
dataSource[%s]", query.getDataSource());
- }
+ analysis.getBaseTableDataSource();
if (analysis.getBaseQuerySegmentSpec().isPresent()
&& !analysis.getBaseQuerySegmentSpec().get().equals(new
MultipleSpecificSegmentSpec(descriptors))) {
diff --git
a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
index 13416b22d1f..5b9ad035d84 100644
--- a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
@@ -184,7 +184,7 @@ public class RestrictedDataSource implements DataSource
@Override
public DataSourceAnalysis getAnalysis()
{
- return new DataSourceAnalysis(this, null, null, ImmutableList.of());
+ return base.getAnalysis();
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index 73148641c8d..7c86257e66f 100644
---
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -19,6 +19,7 @@
package org.apache.druid.query.planning;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
@@ -111,23 +112,19 @@ public class DataSourceAnalysis
}
/**
- * Returns the concrete base table.
- * <ul>
- * <li>If {@link #baseDataSource} is a {@link TableDataSource}, returns
itself.
- * <li>If {@link #baseDataSource} is a {@link RestrictedDataSource},
returns {@link RestrictedDataSource#getBase()}.
- * <li>Otherwise, returns an empty Optional.
- *</ul>
- * Note that this can return empty even if {@link
#isConcreteAndTableBased()} is true. This happens if the base
- * datasource is a {@link UnionDataSource} or {@link UnnestDataSource}.
+ * Unwraps the {@link #getBaseDataSource()} if its a {@link TableDataSource}.
+ *
+ * @throws An error of type {@link DruidException.Category#DEFENSIVE} if the
{@link BaseDataSource} is not a table.
+ *
+ * note that this may not be true even {@link #isConcreteAndTableBased()} is
true - in cases when the base
+ * datasource is a {@link UnionDataSource} of {@link TableDataSource}.
*/
- public Optional<TableDataSource> getBaseTableDataSource()
+ public TableDataSource getBaseTableDataSource()
{
if (baseDataSource instanceof TableDataSource) {
- return Optional.of((TableDataSource) baseDataSource);
- } else if (baseDataSource instanceof RestrictedDataSource) {
- return Optional.of(((RestrictedDataSource) baseDataSource).getBase());
+ return (TableDataSource) baseDataSource;
} else {
- return Optional.empty();
+ throw DruidException.defensive("Base dataSource[%s] is not a table!",
baseDataSource);
}
}
diff --git a/processing/src/test/java/org/apache/druid/query/QueriesTest.java
b/processing/src/test/java/org/apache/druid/query/QueriesTest.java
index 444e67e2e46..add02962b31 100644
--- a/processing/src/test/java/org/apache/druid/query/QueriesTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueriesTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.query;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.math.expr.ExprMacroTable;
@@ -33,25 +34,21 @@ import
org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
-import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.JoinType;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
/**
*
*/
public class QueriesTest
{
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
@Test
public void testVerifyAggregations()
{
@@ -328,10 +325,11 @@ public class QueriesTest
.granularity(Granularities.ALL)
.build();
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage("Unable to apply specific segments to
non-table-based dataSource");
-
- final Query<Result<TimeseriesResultValue>> ignored =
Queries.withSpecificSegments(query, descriptors);
+ DruidException e = assertThrows(
+ DruidException.class,
+ () -> Queries.withSpecificSegments(query, descriptors)
+ );
+ Assert.assertEquals("Base
dataSource[LookupDataSource{lookupName='lookyloo'}] is not a table!",
e.getMessage());
}
@Test
diff --git
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
index 42661431ce9..2eaa2ca7f69 100644
---
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
+++
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.query.planning;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -75,7 +76,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
@@ -93,8 +94,8 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
- Assert.assertEquals(RESTRICTED_FOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
@@ -104,6 +105,36 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isBaseColumn("foo"));
}
+ @Test
+ public void testRestrictedInJoin()
+ {
+ JoinDataSource ds = join(
+ RESTRICTED_FOO,
+ LOOKUP_LOOKYLOO,
+ "1.",
+ JoinType.INNER
+ );
+
+ final DataSourceAnalysis analysis = ds.getAnalysis();
+
+ Assert.assertTrue(analysis.isConcreteBased());
+ Assert.assertTrue(analysis.isTableBased());
+ Assert.assertTrue(analysis.isConcreteAndTableBased());
+ /**
+ * The right expectation would be TABLE_FOO.
+ * However right now MSQ wierdly depends on join identifying
RestrictedDataSource as a non-vertex boundary.
+ * That should be fixed when this test will be fixed.
+ */
+ Assert.assertEquals(RESTRICTED_FOO, analysis.getBaseDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
+ Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
+ Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
+ Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
+ Assert.assertFalse(analysis.isGlobal());
+ Assert.assertTrue(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ }
+
@Test
public void testUnion()
{
@@ -114,7 +145,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource),
analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
@@ -134,7 +165,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.of(queryDataSource.getQuery()),
analysis.getBaseQuery());
Assert.assertEquals(
@@ -158,7 +189,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.of(unionDataSource),
analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.of(queryDataSource.getQuery()),
analysis.getBaseQuery());
Assert.assertEquals(
@@ -180,7 +211,7 @@ public class DataSourceAnalysisTest
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
@@ -200,7 +231,7 @@ public class DataSourceAnalysisTest
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.of(queryDataSource.getQuery()),
analysis.getBaseQuery());
Assert.assertEquals(
@@ -222,7 +253,7 @@ public class DataSourceAnalysisTest
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(INLINE, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
@@ -263,7 +294,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -314,7 +345,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
@@ -372,7 +403,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
@@ -422,7 +453,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
@@ -459,7 +490,7 @@ public class DataSourceAnalysisTest
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(
ImmutableList.of(
@@ -489,7 +520,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isConcreteBased());
Assert.assertTrue(analysis.isTableBased());
Assert.assertTrue(analysis.isConcreteAndTableBased());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getJoinBaseTableFilter());
Assert.assertEquals(Optional.of(unionDataSource),
analysis.getBaseUnionDataSource());
Assert.assertEquals(unionDataSource, analysis.getBaseDataSource());
@@ -530,7 +561,7 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isConcreteAndTableBased());
Assert.assertEquals(TABLE_FOO, analysis.getBaseDataSource());
Assert.assertEquals(TrueDimFilter.instance(),
analysis.getJoinBaseTableFilter().orElse(null));
- Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(TABLE_FOO, analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(
Optional.of(
@@ -578,7 +609,7 @@ public class DataSourceAnalysisTest
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
@@ -611,7 +642,7 @@ public class DataSourceAnalysisTest
Assert.assertFalse(analysis.isTableBased());
Assert.assertFalse(analysis.isConcreteAndTableBased());
Assert.assertEquals(LOOKUP_LOOKYLOO, analysis.getBaseDataSource());
- Assert.assertEquals(Optional.empty(), analysis.getBaseTableDataSource());
+ Assert.assertThrows(DruidException.class, () ->
analysis.getBaseTableDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 779bb7d22f4..6eaea5796f5 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -34,7 +34,6 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
@@ -348,14 +347,10 @@ public class BrokerServerView implements
TimelineServerView
}
@Override
- public Optional<VersionedIntervalTimeline<String, ServerSelector>>
getTimeline(final DataSourceAnalysis analysis)
+ public Optional<VersionedIntervalTimeline<String, ServerSelector>>
getTimeline(final TableDataSource dataSource)
{
- final TableDataSource table =
- analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle base datasource:
%s", analysis.getBaseDataSource()));
-
synchronized (lock) {
- return Optional.ofNullable(timelines.get(table.getName()));
+ return Optional.ofNullable(timelines.get(dataSource.getName()));
}
}
diff --git
a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
index 05ae5bcdf42..7cdc68091fa 100644
--- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
+++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java
@@ -332,7 +332,7 @@ public class CachingClusteredClient implements
QuerySegmentWalker
)
{
final Optional<? extends TimelineLookup<String, ServerSelector>>
maybeTimeline = serverView.getTimeline(
- dataSourceAnalysis
+ dataSourceAnalysis.getBaseTableDataSource()
);
if (!maybeTimeline.isPresent()) {
return new ClusterQueryResult<>(Sequences.empty(), 0);
diff --git a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
index 72ad84b4e61..a5bf9c4183a 100644
--- a/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
+++ b/server/src/main/java/org/apache/druid/client/ServerViewUtil.java
@@ -58,7 +58,7 @@ public class ServerViewUtil
)
{
final DataSourceAnalysis analysis = datasource.getAnalysis();
- final Optional<? extends TimelineLookup<String, ServerSelector>>
maybeTimeline = serverView.getTimeline(analysis);
+ final Optional<? extends TimelineLookup<String, ServerSelector>>
maybeTimeline = serverView.getTimeline(analysis.getBaseTableDataSource());
if (!maybeTimeline.isPresent()) {
return Collections.emptyList();
}
diff --git
a/server/src/main/java/org/apache/druid/client/TimelineServerView.java
b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
index 9c6ee608e1f..4cad8c4a5de 100644
--- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java
+++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
@@ -21,7 +21,7 @@ package org.apache.druid.client;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.query.QueryRunner;
-import org.apache.druid.query.planning.DataSourceAnalysis;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
@@ -37,16 +37,13 @@ import java.util.concurrent.Executor;
public interface TimelineServerView extends ServerView
{
/**
- * Returns the timeline for a datasource, if it exists. The analysis object
passed in must represent a scan-based
- * datasource of a single table.
+ * Returns the timeline for a table, if it exists.
*
- * @param analysis data source analysis information
+ * @param dataSource the table
*
* @return timeline, if it exists
- *
- * @throws IllegalStateException if 'analysis' does not represent a
scan-based datasource of a single table
*/
- <T extends TimelineLookup<String, ServerSelector>> Optional<T>
getTimeline(DataSourceAnalysis analysis);
+ <T extends TimelineLookup<String, ServerSelector>> Optional<T>
getTimeline(TableDataSource dataSource);
/**
* Returns a snapshot of the current set of server metadata.
diff --git
a/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
b/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
index 39794eb7b78..1e2ad35f3a9 100644
---
a/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
+++
b/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java
@@ -24,7 +24,6 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.GlobalTableDataSource;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable;
@@ -74,8 +73,7 @@ public class BroadcastTableJoinableFactory implements
JoinableFactory
private Optional<ReferenceCountingIndexedTable>
getOnlyIndexedTable(DataSource dataSource)
{
GlobalTableDataSource broadcastDataSource = (GlobalTableDataSource)
dataSource;
- DataSourceAnalysis analysis = dataSource.getAnalysis();
- return segmentManager.getIndexedTables(analysis).flatMap(tables -> {
+ return segmentManager.getIndexedTables(broadcastDataSource).flatMap(tables
-> {
Iterator<ReferenceCountingIndexedTable> tableIterator =
tables.iterator();
if (!tableIterator.hasNext()) {
return Optional.empty();
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 6460f51ca57..09d07db304f 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -182,7 +182,7 @@ public class SinkQuerySegmentWalker implements
QuerySegmentWalker
final DataSourceAnalysis analysis = dataSourceFromQuery.getAnalysis();
// Sanity check: make sure the query is based on the table we're meant to
handle.
- if (!analysis.getBaseTableDataSource().filter(ds ->
dataSource.equals(ds.getName())).isPresent()) {
+ if (!analysis.getBaseTableDataSource().getName().equals(dataSource)) {
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index dc1c3b599a9..36eaf51c1c3 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -34,7 +34,6 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@@ -289,11 +288,8 @@ public class UnifiedIndexerAppenderatorsManager implements
AppenderatorsManager
@VisibleForTesting
<T> DatasourceBundle getBundle(final Query<T> query)
{
- final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
-
- final TableDataSource table =
- analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
query.getDataSource()));
+ final DataSourceAnalysis analysis = query.getDataSourceAnalysis();
+ final TableDataSource table = analysis.getBaseTableDataSource();
final DatasourceBundle bundle;
diff --git
a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
index 7a404faf7c9..212b61990b7 100644
--- a/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
+++ b/server/src/main/java/org/apache/druid/server/ClientInfoResource.java
@@ -155,7 +155,7 @@ public class ClientInfoResource
}
final Optional<? extends TimelineLookup<String, ServerSelector>>
maybeTimeline =
- timelineServerView.getTimeline((new
TableDataSource(dataSourceName)).getAnalysis());
+ timelineServerView.getTimeline(new TableDataSource(dataSourceName));
final Optional<Iterable<TimelineObjectHolder<String, ServerSelector>>>
maybeServersLookup =
maybeTimeline.map(timeline -> timeline.lookup(theInterval));
if (!maybeServersLookup.isPresent() ||
Iterables.isEmpty(maybeServersLookup.get())) {
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index 5f9b3fc8e8e..558ecafa006 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -28,7 +28,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.PhysicalSegmentInspector;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
@@ -193,35 +192,27 @@ public class SegmentManager
/**
* Returns the timeline for a datasource, if it exists. The analysis object
passed in must represent a scan-based
* datasource of a single table.
- *
- * @param analysis data source analysis information
- *
- * @return timeline, if it exists
- *
- * @throws IllegalStateException if 'analysis' does not represent a
scan-based datasource of a single table
*/
- public Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>>
getTimeline(DataSourceAnalysis analysis)
+ public Optional<VersionedIntervalTimeline<String, ReferenceCountingSegment>>
getTimeline(TableDataSource dataSource)
{
- final TableDataSource tableDataSource = getTableDataSource(analysis);
- return
Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTimeline);
+ return
Optional.ofNullable(dataSources.get(dataSource.getName())).map(DataSourceState::getTimeline);
}
/**
* Returns the collection of {@link IndexedTable} for the entire timeline
(since join conditions do not currently
* consider the queries intervals), if the timeline exists for each of its
segments that are joinable.
*/
- public Optional<Stream<ReferenceCountingIndexedTable>>
getIndexedTables(DataSourceAnalysis analysis)
+ public Optional<Stream<ReferenceCountingIndexedTable>>
getIndexedTables(TableDataSource dataSource)
{
- return getTimeline(analysis).map(timeline -> {
+ return getTimeline(dataSource).map(timeline -> {
// join doesn't currently consider intervals, so just consider all
segments
final Stream<ReferenceCountingSegment> segments =
timeline.lookup(Intervals.ETERNITY)
.stream()
.flatMap(x ->
StreamSupport.stream(x.getObject().payloads().spliterator(), false));
- final TableDataSource tableDataSource = getTableDataSource(analysis);
ConcurrentHashMap<SegmentId, ReferenceCountingIndexedTable> tables =
-
Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTablesLookup)
- .orElseThrow(() -> new ISE("Datasource %s does not have
IndexedTables", tableDataSource.getName()));
+
Optional.ofNullable(dataSources.get(dataSource.getName())).map(DataSourceState::getTablesLookup)
+ .orElseThrow(() -> new ISE("dataSource[%s] does not have
IndexedTables", dataSource.getName()));
return segments.map(segment ->
tables.get(segment.getId())).filter(Objects::nonNull);
});
}
@@ -234,12 +225,6 @@ public class SegmentManager
return false;
}
- private TableDataSource getTableDataSource(DataSourceAnalysis analysis)
- {
- return analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getBaseDataSource()));
- }
-
/**
* Load the supplied segment into page cache on bootstrap. If the segment is
already loaded, this method does not
* reload the segment into the page cache.
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
index b3297ce9a60..302c4951bc8 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ServerManager.java
@@ -121,10 +121,10 @@ public class ServerManager implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query,
Iterable<Interval> intervals)
{
- final DataSourceAnalysis analysis = query.getDataSource().getAnalysis();
+ final DataSourceAnalysis analysis = query.getDataSourceAnalysis();
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
final Optional<VersionedIntervalTimeline<String,
ReferenceCountingSegment>> maybeTimeline =
- segmentManager.getTimeline(analysis);
+ segmentManager.getTimeline(analysis.getBaseTableDataSource());
if (maybeTimeline.isPresent()) {
timeline = maybeTimeline.get();
@@ -182,7 +182,7 @@ public class ServerManager implements QuerySegmentWalker
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline;
final Optional<VersionedIntervalTimeline<String,
ReferenceCountingSegment>> maybeTimeline =
- segmentManager.getTimeline(analysis);
+ segmentManager.getTimeline(analysis.getBaseTableDataSource());
// Make sure this query type can handle the subquery, if present.
if ((dataSourceFromQuery instanceof QueryDataSource)
diff --git
a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
index 33b10a8b391..0afde542494 100644
--- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java
@@ -112,7 +112,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- (new TableDataSource("test_broker_server_view")).getAnalysis()
+ new TableDataSource("test_broker_server_view")
).get();
List<TimelineObjectHolder<String, ServerSelector>> serverLookupRes =
timeline.lookup(intervals);
Assert.assertEquals(1, serverLookupRes.size());
@@ -174,7 +174,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
- (new TableDataSource("test_broker_server_view")).getAnalysis()
+ new TableDataSource("test_broker_server_view")
).get();
assertValues(
Arrays.asList(
@@ -197,7 +197,7 @@ public class BrokerServerViewTest extends CuratorTestBase
segmentRemovedLatch = new CountDownLatch(4);
timeline = brokerServerView.getTimeline(
- (new TableDataSource("test_broker_server_view")).getAnalysis()
+ new TableDataSource("test_broker_server_view")
).get();
assertValues(
Arrays.asList(
@@ -276,7 +276,7 @@ public class BrokerServerViewTest extends CuratorTestBase
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));
TimelineLookup timeline = brokerServerView.getTimeline(
- (new TableDataSource("test_broker_server_view")).getAnalysis()
+ new TableDataSource("test_broker_server_view")
).get();
assertValues(
@@ -306,7 +306,7 @@ public class BrokerServerViewTest extends CuratorTestBase
segmentRemovedLatch = new CountDownLatch(5);
timeline = brokerServerView.getTimeline(
- (new TableDataSource("test_broker_server_view")).getAnalysis()
+ new TableDataSource("test_broker_server_view")
).get();
// expect same set of segments as before
@@ -362,7 +362,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- (new TableDataSource(segment1.getDataSource())).getAnalysis()
+ new TableDataSource(segment1.getDataSource())
).get();
// Verify that the timeline has no entry for the interval of segment 1
@@ -422,7 +422,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- (new TableDataSource(segment1.getDataSource())).getAnalysis()
+ new TableDataSource(segment1.getDataSource())
).get();
// Verify that the timeline has no entry for the interval of segment 1
@@ -484,7 +484,7 @@ public class BrokerServerViewTest extends CuratorTestBase
// Get the timeline for the datasource
TimelineLookup<String, ServerSelector> timeline =
brokerServerView.getTimeline(
- (new TableDataSource(segment1.getDataSource())).getAnalysis()
+ new TableDataSource(segment1.getDataSource())
).get();
// Verify that the timeline has no entry for the interval of segment 1
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 7e518e328dd..27fb1b49cc5 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -40,9 +40,9 @@ import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -238,7 +238,7 @@ public class CachingClusteredClientFunctionalityTest
}
@Override
- public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(DataSourceAnalysis analysis)
+ public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(TableDataSource table)
{
return Optional.of(timeline);
}
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 26fa73c3897..db9f2ae75d7 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -74,6 +74,7 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -92,7 +93,6 @@ import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.ordering.StringComparators;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.search.SearchHit;
import org.apache.druid.query.search.SearchQuery;
import org.apache.druid.query.search.SearchQueryConfig;
@@ -2614,7 +2614,7 @@ public class CachingClusteredClientTest
}
@Override
- public Optional<VersionedIntervalTimeline<String, ServerSelector>>
getTimeline(DataSourceAnalysis analysis)
+ public Optional<VersionedIntervalTimeline<String, ServerSelector>>
getTimeline(TableDataSource analysis)
{
return Optional.of(timeline);
}
diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java
b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
index ef31e5f8a76..523bba766ba 100644
--- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java
+++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java
@@ -32,7 +32,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
@@ -123,12 +122,8 @@ public class SimpleServerView implements TimelineServerView
}
@Override
- public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(DataSourceAnalysis analysis)
+ public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(TableDataSource table)
{
- final TableDataSource table =
- analysis.getBaseTableDataSource()
- .orElseThrow(() -> new ISE("Cannot handle datasource: %s",
analysis.getBaseDataSource()));
-
return Optional.ofNullable(timelines.get(table.getName()));
}
diff --git
a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java
b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java
index 534b58daae4..d7f2d98e8c4 100644
--- a/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/ClientInfoResourceTest.java
@@ -30,8 +30,8 @@ import
org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
-import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.timeline.DataSegment;
@@ -129,7 +129,7 @@ public class ClientInfoResourceTest
EasyMock.expect(serverInventoryView.getInventory()).andReturn(ImmutableList.of(server)).anyTimes();
timelineServerView = EasyMock.createMock(TimelineServerView.class);
-
EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(DataSourceAnalysis.class)))
+
EasyMock.expect(timelineServerView.getTimeline(EasyMock.anyObject(TableDataSource.class)))
.andReturn((Optional) Optional.of(timeline));
EasyMock.replay(serverInventoryView, timelineServerView);
@@ -223,7 +223,7 @@ public class ClientInfoResourceTest
"2015-02-02T09:00:00.000Z/2015-02-06T23:00:00.000Z",
"true"
);
-
+
Map<String, Object> expected = ImmutableMap.of(
"2015-02-02T09:00:00.000Z/2015-02-03T00:00:00.000Z",
ImmutableMap.of(KEY_DIMENSIONS, ImmutableSet.of("d1"), KEY_METRICS,
ImmutableSet.of("m1")),
diff --git
a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 56aee9fe772..917cee101af 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -323,7 +323,7 @@ public class SegmentManagerTest
{
Assert.assertEquals(
Optional.empty(),
- segmentManager.getTimeline((new
TableDataSource("nonExisting")).getAnalysis())
+ segmentManager.getTimeline((new TableDataSource("nonExisting")))
);
}
diff --git
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 50fc1f20f55..8c3db1ddd88 100644
---
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -119,7 +119,7 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
throw new ISE("Cannot handle datasource: %s",
queryPlus.getQuery().getDataSource());
}
- final String dataSourceName =
analysis.getBaseTableDataSource().get().getName();
+ final String dataSourceName =
analysis.getBaseTableDataSource().getName();
FunctionalIterable<SegmentDescriptor> segmentDescriptors =
FunctionalIterable
.create(intervals)
@@ -145,7 +145,7 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
- final String dataSourceName =
analysis.getBaseTableDataSource().get().getName();
+ final String dataSourceName = analysis.getBaseTableDataSource().getName();
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index eb7b3447171..84b11f1a729 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -553,7 +553,7 @@ public class ServerManagerTest
final Interval interval = Intervals.of("P1d/2011-04-01");
final SearchQuery query = searchQuery("test", interval, Granularities.ALL);
final Optional<VersionedIntervalTimeline<String,
ReferenceCountingSegment>> maybeTimeline = segmentManager
- .getTimeline(query.getDataSource().getAnalysis());
+ .getTimeline(query.getDataSourceAnalysis().getBaseTableDataSource());
Assert.assertTrue(maybeTimeline.isPresent());
final List<TimelineObjectHolder<String, ReferenceCountingSegment>> holders
= maybeTimeline.get().lookup(interval);
final List<SegmentDescriptor> closedSegments = new ArrayList<>();
diff --git
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
index 7966f81cebe..4c6947de51d 100644
---
a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java
@@ -56,9 +56,10 @@ import org.mockito.stubbing.Answer;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
+
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -178,7 +179,7 @@ public class MetadataResourceTest
Mockito.doReturn(null).when(coordinator).getReplicationFactor(realTimeSegments[0].getId());
Mockito.doReturn(null).when(coordinator).getReplicationFactor(realTimeSegments[1].getId());
- Map<SegmentId, AvailableSegmentMetadata> availableSegments = new
HashMap<>();
+ Map<SegmentId, AvailableSegmentMetadata> availableSegments = new
LinkedHashMap<>();
availableSegments.put(
segments[0].getId(),
AvailableSegmentMetadata.builder(
@@ -422,7 +423,7 @@ public class MetadataResourceTest
public void testGetDataSourceInformation()
{
CoordinatorSegmentMetadataCache coordinatorSegmentMetadataCache =
Mockito.mock(CoordinatorSegmentMetadataCache.class);
- Map<String, DataSourceInformation> dataSourceInformationMap = new
HashMap<>();
+ Map<String, DataSourceInformation> dataSourceInformationMap = new
LinkedHashMap<>();
dataSourceInformationMap.put(
DATASOURCE1,
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
index 220ee816a54..eb92a090d62 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
@@ -227,7 +227,7 @@ public class BrokerSegmentMetadataCacheConcurrencyTest
extends BrokerSegmentMeta
for (int i = 0; i < 1000; i++) {
boolean hasTimeline = exec.submit(
- () -> serverView.getTimeline((new
TableDataSource(DATASOURCE)).getAnalysis())
+ () -> serverView.getTimeline((new TableDataSource(DATASOURCE)))
.isPresent()
).get(100, TimeUnit.MILLISECONDS);
Assert.assertTrue(hasTimeline);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
index 58990e80661..42aa4e491cb 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
@@ -28,7 +28,7 @@ import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.QueryRunner;
-import org.apache.druid.query.planning.DataSourceAnalysis;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
@@ -91,7 +91,7 @@ public class TestTimelineServerView implements
TimelineServerView
}
@Override
- public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(DataSourceAnalysis analysis)
+ public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(TableDataSource table)
{
throw new UnsupportedOperationException();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]