This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ea102d19b9b Enable querying policy-enabled table in MSQ, and use
RestrictedDataSource as a base in DataSourceAnalysis. (#17666)
ea102d19b9b is described below
commit ea102d19b9baa6f9506aa58df50c9ab97fb0af00
Author: Cece Mei <[email protected]>
AuthorDate: Tue Feb 18 21:43:46 2025 -0800
Enable querying policy-enabled table in MSQ, and use RestrictedDataSource
as a base in DataSourceAnalysis. (#17666)
DataSourceAnalysis, getBaseTableDataSource can now return the base of
RestrictedDataSource. This is a more robust solution than using the underlying
table as base.
MSQTaskQueryMaker would add policies to the query, instead of throw
permission error.
DataSourcePlan can handle RestrictedDataSource.
a new class RestrictedInputNumberDataSource, which basically wraps a
NumberDataSource with a policy, and its SegmentMapFn can be used to create a
RestrictedSegment.
RunWorkOrder, try to make a few refactors to make the code clear, no
behavior change. ShufflePipelineBuilder.build(), it was not clear before that
the channel future should only be returned when the resultFuture is ready.
Also, the sanity check is moved to OutputChannels.
Added tests in MSQSelectTest, MSQReplaceTest, MSQInsertTest,
MSQExportTest.
---
extensions-core/multi-stage-query/README.md | 2 +-
.../org/apache/druid/msq/exec/RunWorkOrder.java | 40 ++------
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 12 +--
.../apache/druid/msq/querykit/DataSourcePlan.java | 33 ++++++-
.../druid/msq/querykit/InputNumberDataSource.java | 9 +-
...e.java => RestrictedInputNumberDataSource.java} | 57 +++++++----
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 4 -
.../org/apache/druid/msq/exec/MSQExportTest.java | 107 ++++++++++++++++-----
.../org/apache/druid/msq/exec/MSQInsertTest.java | 36 +++++++
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 80 ++++++++++++++-
.../org/apache/druid/msq/exec/MSQSelectTest.java | 105 ++++++++++++++++++--
.../RestrictedInputNumberDataSourceTest.java | 79 ++++++---------
.../org/apache/druid/msq/test/MSQTestBase.java | 35 +++++--
.../druid/frame/processor/OutputChannels.java | 21 ++++
.../java/org/apache/druid/query/DataSource.java | 3 +-
.../org/apache/druid/query/JoinDataSource.java | 6 +-
.../apache/druid/query/RestrictedDataSource.java | 12 ++-
.../druid/query/planning/DataSourceAnalysis.java | 14 ++-
.../druid/frame/processor/OutputChannelsTest.java | 24 +++++
.../org/apache/druid/query/DataSourceTest.java | 17 +++-
.../org/apache/druid/query/JoinDataSourceTest.java | 10 +-
.../druid/query/RestrictedDataSourceTest.java | 9 +-
.../query/planning/DataSourceAnalysisTest.java | 25 +++++
.../apache/druid/server/QueryLifecycleTest.java | 4 +-
.../server/TestClusterQuerySegmentWalker.java | 5 +-
.../planner/querygen/DruidQueryGenerator.java | 3 +-
.../sql/calcite/rel/DruidCorrelateUnnestRel.java | 6 +-
.../druid/sql/calcite/rel/DruidJoinQueryRel.java | 6 +-
.../druid/sql/calcite/rel/DruidOuterQueryRel.java | 6 +-
.../apache/druid/sql/calcite/rel/DruidQuery.java | 5 +-
.../druid/sql/calcite/rel/DruidQueryRel.java | 3 +-
.../sql/calcite/rel/DruidUnionDataSourceRel.java | 6 +-
.../druid/sql/calcite/rel/PartialDruidQuery.java | 12 ++-
.../druid/sql/calcite/CalciteSelectQueryTest.java | 12 ++-
.../druid/sql/calcite/util/CalciteTests.java | 12 +--
35 files changed, 603 insertions(+), 217 deletions(-)
diff --git a/extensions-core/multi-stage-query/README.md
b/extensions-core/multi-stage-query/README.md
index 491be586d8b..5b00da45f0b 100644
--- a/extensions-core/multi-stage-query/README.md
+++ b/extensions-core/multi-stage-query/README.md
@@ -131,7 +131,7 @@ Package `org.apache.druid.msq.sql` contains code related to
integration with Dru
Main classes:
--
[SqlTaskResource](src/main/java/org/apache/druid/msq/counters/CounterTracker.java)
offers the endpoint
+-
[SqlTaskResource](src/main/java/org/apache/druid/msq/sql/resources/SqlTaskResource.java)
offers the endpoint
`/druid/v2/sql/task`, where SQL queries are executed as multi-stage query
tasks.
-
[MSQTaskSqlEngine](src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java)
is a SqlEngine implementation that
executes SQL queries as multi-stage query tasks. It is injected into the
SqlTaskResource.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index 024036cdbff..01d2e925aa7 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -19,8 +19,6 @@
package org.apache.druid.msq.exec;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -1020,19 +1018,7 @@ public class RunWorkOrder
throw new ISE("Not initialized");
}
- return Futures.transformAsync(
- pipelineFuture,
- resultAndChannels ->
- Futures.transform(
- resultAndChannels.getResultFuture(),
- (Function<Object, OutputChannels>) input -> {
-
sanityCheckOutputChannels(resultAndChannels.getOutputChannels());
- return resultAndChannels.getOutputChannels();
- },
- Execs.directExecutor()
- ),
- Execs.directExecutor()
- );
+ return Futures.transformAsync(pipelineFuture,
ResultAndChannels::waitResultReady, Execs.directExecutor());
}
/**
@@ -1143,25 +1129,6 @@ public class RunWorkOrder
)
);
}
-
- /**
- * Verifies there is exactly one channel per partition.
- */
- private void sanityCheckOutputChannels(final OutputChannels outputChannels)
- {
- for (int partitionNumber : outputChannels.getPartitionNumbers()) {
- final List<OutputChannel> outputChannelsForPartition =
- outputChannels.getChannelsForPartition(partitionNumber);
-
- Preconditions.checkState(partitionNumber >= 0, "Expected
partitionNumber >= 0, but got [%s]", partitionNumber);
- Preconditions.checkState(
- outputChannelsForPartition.size() == 1,
- "Expected one channel for partition [%s], but got [%s]",
- partitionNumber,
- outputChannelsForPartition.size()
- );
- }
- }
}
private static class ResultAndChannels<T>
@@ -1187,6 +1154,11 @@ public class RunWorkOrder
{
return outputChannels;
}
+
+ public ListenableFuture<OutputChannels> waitResultReady()
+ {
+ return Futures.transform(resultFuture, unused ->
outputChannels.verifySingleChannel(), Execs.directExecutor());
+ }
}
private interface ExceptionalFunction<T, R>
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 5d19c899b10..c7b1471380c 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -103,7 +103,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
private final AtomicReference<State> state = new
AtomicReference<>(State.NEW);
private final AtomicBoolean cancelTasksOnStop = new AtomicBoolean();
- // Set by launchTasksIfNeeded.
+ // Set by launchWorkersIfNeeded.
@GuardedBy("taskIds")
private int desiredTaskCount = 0;
@@ -467,13 +467,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
);
taskTrackers.put(task.getId(), new TaskTracker(i, task));
- workerToTaskIds.compute(i, (workerId, taskIds) -> {
- if (taskIds == null) {
- taskIds = new ArrayList<>();
- }
- taskIds.add(task.getId());
- return taskIds;
- });
+ workerToTaskIds.computeIfAbsent(i, (unused) -> (new
ArrayList<>())).add(task.getId());
FutureUtils.getUnchecked(overlordClient.runTask(task.getId(), task),
true);
@@ -777,7 +771,7 @@ public class MSQWorkerTaskLauncher implements
RetryCapableWorkerManager
} else {
// wait on taskIds so we can wake up early if needed.
synchronized (taskIds) {
- // desiredTaskCount is set by launchTasksIfNeeded, and
acknowledgedDesiredTaskCount is set by mainLoop when
+ // desiredTaskCount is set by launchWorkersIfNeeded, and
acknowledgedDesiredTaskCount is set by mainLoop when
// it acknowledges a new target. If these are not equal, do another
run immediately and launch more tasks.
if (acknowledgedDesiredTaskCount == desiredTaskCount) {
taskIds.wait(sleepMillis);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index d20adc1e006..341d5205ff6 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -52,6 +52,7 @@ import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.RestrictedDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
@@ -165,6 +166,14 @@ public class DataSourcePlan
filterFields,
broadcast
);
+ } else if (dataSource instanceof RestrictedDataSource) {
+ return forRestricted(
+ (RestrictedDataSource) dataSource,
+ querySegmentSpecIntervals(querySegmentSpec),
+ filter,
+ filterFields,
+ broadcast
+ );
} else if (dataSource instanceof ExternalDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forExternal((ExternalDataSource) dataSource, broadcast);
@@ -329,7 +338,7 @@ public class DataSourcePlan
/**
* Checks if the sortMerge algorithm can execute a particular join condition.
- *
+ * <p>
* One check: join condition on two tables "table1" and "table2" is of the
form
* table1.columnA = table2.columnA && table1.columnB = table2.columnB && ....
*/
@@ -365,6 +374,24 @@ public class DataSourcePlan
);
}
+ private static DataSourcePlan forRestricted(
+ final RestrictedDataSource dataSource,
+ final List<Interval> intervals,
+ @Nullable final DimFilter filter,
+ @Nullable final Set<String> filterFields,
+ final boolean broadcast
+ )
+ {
+ return new DataSourcePlan(
+ (broadcast && dataSource.isGlobal())
+ ? dataSource
+ : new RestrictedInputNumberDataSource(0, dataSource.getPolicy()),
+ Collections.singletonList(new
TableInputSpec(dataSource.getBase().getName(), intervals, filter,
filterFields)),
+ broadcast ? IntOpenHashSet.of(0) : IntSets.emptySet(),
+ null
+ );
+ }
+
private static DataSourcePlan forExternal(
final ExternalDataSource dataSource,
final boolean broadcast
@@ -764,10 +791,10 @@ public class DataSourcePlan
/**
* Verify that the provided {@link QuerySegmentSpec} is a {@link
MultipleIntervalSegmentSpec} with
* interval {@link Intervals#ETERNITY}. If not, throw an {@link
UnsupportedOperationException}.
- *
+ * <p>
* Anywhere this appears is a place that we do not support using the
"intervals" parameter of a query
* (i.e., {@link org.apache.druid.query.BaseQuery#getQuerySegmentSpec()})
for time filtering.
- *
+ * <p>
* We don't need to support this for anything that is not {@link
DataSourceAnalysis#isTableBased()}, because
* the SQL layer avoids "intervals" in other cases. See
* {@link
org.apache.druid.sql.calcite.rel.DruidQuery#canUseIntervalFiltering(DataSource)}.
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
index 05560796435..9cb73c22d24 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
@@ -37,10 +37,11 @@ import java.util.function.Function;
/**
* Represents an input number, i.e., a positional index into
* {@link org.apache.druid.msq.kernel.StageDefinition#getInputSpecs()}.
- *
- * Used by {@link DataSourcePlan} to note which inputs correspond to which
datasources in the query being planned.
- *
- * Used by {@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast
inputs with the correct datasources in a
+ * <p>
+ * Used by
+ * <ul>
+ * <li>{@link DataSourcePlan}, to note which inputs correspond to which
datasources in the query being planned.
+ * <li>{@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast
inputs with the correct datasources in a
* join tree.
*/
@JsonTypeName("inputNumber")
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/RestrictedInputNumberDataSource.java
similarity index 71%
copy from
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
copy to
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/RestrictedInputNumberDataSource.java
index 05560796435..a7a00725846 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/InputNumberDataSource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/RestrictedInputNumberDataSource.java
@@ -22,10 +22,13 @@ package org.apache.druid.msq.querykit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.DataSourceAnalysis;
+import org.apache.druid.query.policy.Policy;
+import org.apache.druid.segment.RestrictedSegment;
import org.apache.druid.segment.SegmentReference;
import java.util.Collections;
@@ -36,22 +39,40 @@ import java.util.function.Function;
/**
* Represents an input number, i.e., a positional index into
- * {@link org.apache.druid.msq.kernel.StageDefinition#getInputSpecs()}.
- *
- * Used by {@link DataSourcePlan} to note which inputs correspond to which
datasources in the query being planned.
- *
- * Used by {@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast
inputs with the correct datasources in a
+ * {@link org.apache.druid.msq.kernel.StageDefinition#getInputSpecs()}, with
policy restriction.
+ * <p>
+ * Used by
+ * <ul>
+ * <li>{@link DataSourcePlan}, to note which inputs correspond to which
datasources in the query being planned.
+ * <li>{@link BroadcastJoinSegmentMapFnProcessor} to associate broadcast
inputs with the correct datasources in a
* join tree.
*/
-@JsonTypeName("inputNumber")
-public class InputNumberDataSource implements DataSource
+@JsonTypeName("restrictedInputNumber")
+public class RestrictedInputNumberDataSource implements DataSource
{
private final int inputNumber;
+ private final Policy policy;
@JsonCreator
- public InputNumberDataSource(@JsonProperty("inputNumber") int inputNumber)
+ public RestrictedInputNumberDataSource(
+ @JsonProperty("inputNumber") int inputNumber,
+ @JsonProperty("policy") Policy policy
+ )
{
this.inputNumber = inputNumber;
+ this.policy = Preconditions.checkNotNull(policy, "Policy can't be null");
+ }
+
+ @JsonProperty
+ public int getInputNumber()
+ {
+ return inputNumber;
+ }
+
+ @JsonProperty
+ public Policy getPolicy()
+ {
+ return policy;
}
@Override
@@ -91,14 +112,13 @@ public class InputNumberDataSource implements DataSource
@Override
public boolean isConcrete()
{
- // InputNumberDataSource represents InputSpecs, which are scannable via
Segment adapters.
return true;
}
@Override
public Function<SegmentReference, SegmentReference>
createSegmentMapFunction(Query query)
{
- return Function.identity();
+ return baseSegment -> new RestrictedSegment(baseSegment, policy);
}
@Override
@@ -119,12 +139,6 @@ public class InputNumberDataSource implements DataSource
return new DataSourceAnalysis(this, null, null, Collections.emptyList());
}
- @JsonProperty
- public int getInputNumber()
- {
- return inputNumber;
- }
-
@Override
public boolean equals(Object o)
{
@@ -134,21 +148,22 @@ public class InputNumberDataSource implements DataSource
if (o == null || getClass() != o.getClass()) {
return false;
}
- InputNumberDataSource that = (InputNumberDataSource) o;
- return inputNumber == that.inputNumber;
+ RestrictedInputNumberDataSource that = (RestrictedInputNumberDataSource) o;
+ return inputNumber == that.inputNumber && policy.equals(that.policy);
}
@Override
public int hashCode()
{
- return Objects.hash(inputNumber);
+ return Objects.hash(inputNumber, policy);
}
@Override
public String toString()
{
- return "InputNumberDataSource{" +
+ return "RestrictedInputNumberDataSource{" +
"inputNumber=" + inputNumber +
- '}';
+ ", policy=" + policy + "}";
+
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 9f69396edcf..5462b991737 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -54,7 +54,6 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
-import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
@@ -117,9 +116,6 @@ public class MSQTaskQueryMaker implements QueryMaker
@Override
public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
- if
(!plannerContext.getAuthorizationResult().allowAccessWithNoRestriction()) {
- throw new
ForbiddenException(plannerContext.getAuthorizationResult().getErrorMessage());
- }
Hook.QUERY_PLAN.run(druidQuery.getQuery());
plannerContext.dispatchHook(DruidHook.NATIVE_PLAN, druidQuery.getQuery());
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
index dde587c0c04..559c9bf4f77 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
@@ -22,12 +22,15 @@ package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Assert;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.BufferedReader;
import java.io.File;
@@ -35,6 +38,8 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,8 +47,19 @@ import java.util.Objects;
public class MSQExportTest extends MSQTestBase
{
- @Test
- public void testExport() throws IOException
+
+ public static Collection<Object[]> data()
+ {
+ Object[][] data = new Object[][]{
+ {DEFAULT, DEFAULT_MSQ_CONTEXT},
+ {SUPERUSER, SUPERUSER_MSQ_CONTEXT}
+ };
+ return Arrays.asList(data);
+ }
+
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testExport(String unusedContextName, Map<String, Object>
context) throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@@ -55,7 +71,7 @@ public class MSQExportTest extends MSQTestBase
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
- .setQueryContext(DEFAULT_MSQ_CONTEXT)
+ .setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegments(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
@@ -74,8 +90,9 @@ public class MSQExportTest extends MSQTestBase
);
}
- @Test
- public void testExport2() throws IOException
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testExport2(String unusedContextName, Map<String, Object>
context) throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("dim1", ColumnType.STRING)
@@ -86,7 +103,7 @@ public class MSQExportTest extends MSQTestBase
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
- .setQueryContext(DEFAULT_MSQ_CONTEXT)
+ .setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegments(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
@@ -108,8 +125,48 @@ public class MSQExportTest extends MSQTestBase
verifyManifestFile(exportDir, ImmutableList.of(resultFile));
}
- @Test
- public void testNumberOfRowsPerFile()
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testExportRestricted(String unusedContextName, Map<String,
Object> context) throws IOException
+ {
+ // Set expected results based on query's end user
+ boolean isSuperUser =
context.get(MSQTaskQueryMaker.USER_KEY).equals(CalciteTests.TEST_SUPERUSER_NAME);
+ List<String> expectedResultRows = isSuperUser
+ ? Arrays.asList("m1", "1.0", "2.0",
"3.0", "4.0", "5.0", "6.0")
+ : Arrays.asList("m1", "6.0");
+ // Set common expected results (not relevant to query's end user)
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("m1", ColumnType.FLOAT)
+ .build();
+
+ File exportDir = newTempFolder("export");
+ final String sql = StringUtils.format(
+ "insert into extern(local(exportPath=>'%s')) as csv select m1 from
restrictedDatasource_m1_is_6",
+ exportDir.getAbsolutePath()
+ );
+
+ testIngestQuery().setSql(sql)
+ .setExpectedDataSource("restrictedDatasource_m1_is_6")
+ .setQueryContext(context)
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedSegments(ImmutableSet.of())
+ .setExpectedResultRows(ImmutableList.of())
+ .verifyResults();
+
+ Assert.assertEquals(
+ 2, // result file and manifest file
+ Objects.requireNonNull(exportDir.listFiles()).length
+ );
+
+ File resultFile = new File(exportDir,
"query-test-query-worker0-partition0.csv");
+ List<String> results = readResultsFromFile(resultFile);
+ Assert.assertEquals(expectedResultRows, results);
+ }
+
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testNumberOfRowsPerFile(String unusedContextName, Map<String,
Object> context)
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@@ -118,7 +175,7 @@ public class MSQExportTest extends MSQTestBase
File exportDir = newTempFolder("export");
- Map<String, Object> queryContext = new HashMap<>(DEFAULT_MSQ_CONTEXT);
+ Map<String, Object> queryContext = new HashMap<>(context);
queryContext.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 1);
final String sql = StringUtils.format("insert into
extern(local(exportPath=>'%s')) as csv select cnt, dim1 from foo",
exportDir.getAbsolutePath());
@@ -137,8 +194,9 @@ public class MSQExportTest extends MSQTestBase
);
}
- @Test
- void testExportComplexColumns() throws IOException
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testExportComplexColumns(String unusedContextName, Map<String,
Object> context) throws IOException
{
final RowSignature rowSignature = RowSignature.builder()
.add("__time",
ColumnType.LONG)
@@ -166,7 +224,7 @@ public class MSQExportTest extends MSQTestBase
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
- .setQueryContext(DEFAULT_MSQ_CONTEXT)
+ .setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegments(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
@@ -187,8 +245,9 @@ public class MSQExportTest extends MSQTestBase
);
}
- @Test
- void testExportSketchColumns() throws IOException
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testExportSketchColumns(String unusedContextName, Map<String,
Object> context) throws IOException
{
final RowSignature rowSignature = RowSignature.builder()
.add("__time",
ColumnType.LONG)
@@ -217,7 +276,7 @@ public class MSQExportTest extends MSQTestBase
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
- .setQueryContext(DEFAULT_MSQ_CONTEXT)
+ .setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegments(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
@@ -238,8 +297,10 @@ public class MSQExportTest extends MSQTestBase
);
}
- @Test
- void testEmptyExport() throws IOException
+
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testEmptyExport(String unusedContextName, Map<String, Object>
context) throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@@ -256,7 +317,7 @@ public class MSQExportTest extends MSQTestBase
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
- .setQueryContext(DEFAULT_MSQ_CONTEXT)
+ .setQueryContext(context)
.setExpectedRowSignature(rowSignature)
.setExpectedSegments(ImmutableSet.of())
.setExpectedResultRows(ImmutableList.of())
@@ -316,8 +377,10 @@ public class MSQExportTest extends MSQTestBase
}
}
- @Test
- public void testExportWithLimit() throws IOException
+
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testExportWithLimit(String unusedContextName, Map<String,
Object> context) throws IOException
{
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
@@ -326,7 +389,7 @@ public class MSQExportTest extends MSQTestBase
File exportDir = newTempFolder("export");
- Map<String, Object> queryContext = new HashMap<>(DEFAULT_MSQ_CONTEXT);
+ Map<String, Object> queryContext = new HashMap<>(context);
queryContext.put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 1);
final String sql = StringUtils.format("insert into
extern(local(exportPath=>'%s')) as csv select cnt, dim1 from foo limit 3",
exportDir.getAbsolutePath());
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index ad020d4548f..801faac44af 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault;
import org.apache.druid.msq.indexing.report.MSQSegmentReport;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -62,6 +63,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
@@ -101,6 +103,7 @@ public class MSQInsertTest extends MSQTestBase
{
Object[][] data = new Object[][]{
{DEFAULT, DEFAULT_MSQ_CONTEXT},
+ {SUPERUSER, SUPERUSER_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
@@ -1603,6 +1606,39 @@ public class MSQInsertTest extends MSQTestBase
.verifyResults();
}
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testInsertOnRestricted(String contextName, Map<String, Object>
context)
+ {
+ // Set expected results based on query's end user
+ boolean isSuperUser =
context.get(MSQTaskQueryMaker.USER_KEY).equals(CalciteTests.TEST_SUPERUSER_NAME);
+ List<Object[]> expectedRows = isSuperUser ? ImmutableList.of(
+ new Object[]{978307200000L, 4.0f},
+ new Object[]{978393600000L, 5.0f},
+ new Object[]{978480000000L, 6.0f}
+ ) : ImmutableList.of(new Object[]{978480000000L, 6.0f});
+ // Set common expected results (not relevant to query's end user)
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("m1", ColumnType.FLOAT)
+ .build();
+
+ testIngestQuery().setSql(
+ "insert into restrictedDatasource_m1_is_6 select
__time, m1 from restrictedDatasource_m1_is_6 where __time >= TIMESTAMP
'2001-01-01' partitioned by all")
+ .setExpectedDataSource("restrictedDatasource_m1_is_6")
+ .setQueryContext(new HashMap<>(context))
+ .setExpectedRowSignature(rowSignature)
+
.setExpectedSegments(ImmutableSet.of(SegmentId.of("restrictedDatasource_m1_is_6",
Intervals.ETERNITY, "test", 0)))
+ .setExpectedResultRows(expectedRows)
+ .setExpectedMSQSegmentReport(
+ new MSQSegmentReport(
+ NumberedShardSpec.class.getSimpleName(),
+ "Using NumberedShardSpec to generate segments
since the query is inserting rows."
+ )
+ )
+ .verifyResults();
+ }
+
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testCorrectNumberOfWorkersUsedAutoModeWithoutBytesLimit(String
contextName, Map<String, Object> context) throws IOException
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index d196dccedf0..bf97432d3f8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -46,6 +46,7 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault;
import org.apache.druid.msq.indexing.report.MSQSegmentReport;
+import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestTaskActionClient;
@@ -56,6 +57,7 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -100,7 +102,8 @@ public class MSQReplaceTest extends MSQTestBase
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
- {WITH_REPLACE_LOCK_AND_COMPACTION_STATE,
QUERY_CONTEXT_WITH_REPLACE_LOCK_AND_COMPACTION_STATE},
+ {SUPERUSER, SUPERUSER_MSQ_CONTEXT},
+ {WITH_REPLACE_LOCK_AND_COMPACTION_STATE,
QUERY_CONTEXT_WITH_REPLACE_LOCK_AND_COMPACTION_STATE}
};
return Arrays.asList(data);
}
@@ -704,6 +707,81 @@ public class MSQReplaceTest extends MSQTestBase
.verifyResults();
}
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testReplaceOnRestricted(String contextName, Map<String, Object>
context)
+ {
+ // Set expected results based on query's end user
+ boolean isSuperUser =
context.get(MSQTaskQueryMaker.USER_KEY).equals(CalciteTests.TEST_SUPERUSER_NAME);
+ ImmutableSet<Interval> expectedTombstoneIntervals = isSuperUser
+ ? ImmutableSet.of()
+ : ImmutableSet.of(
+
Intervals.of("2001-01-01T/P1D"),
+
Intervals.of("2001-01-02T/P1D")
+ );
+ ImmutableSet<SegmentId> expectedSegments = isSuperUser ? ImmutableSet.of(
+ SegmentId.of("restrictedDatasource_m1_is_6",
Intervals.of("2001-01-01T/P1D"), "test", 0),
+ SegmentId.of("restrictedDatasource_m1_is_6",
Intervals.of("2001-01-02T/P1D"), "test", 0),
+ SegmentId.of("restrictedDatasource_m1_is_6",
Intervals.of("2001-01-03T/P1D"), "test", 0)
+ ) : ImmutableSet.of(SegmentId.of("restrictedDatasource_m1_is_6",
Intervals.of("2001-01-03T/P1D"), "test", 0));
+ ImmutableList<Object[]> expectedResultRows = isSuperUser ?
ImmutableList.of(
+ new Object[]{978307200000L, 4.0f},
+ new Object[]{978393600000L, 5.0f},
+ new Object[]{978480000000L, 6.0f}
+ ) : ImmutableList.of(new Object[]{978480000000L, 6.0f});
+ // Set common expected results (not relevant to query's end user)
+ CounterSnapshotMatcher shuffleCounterSnapshotMatcher = isSuperUser
+ ?
CounterSnapshotMatcher.with()
+
.rows(1, 1, 1)
+
.frames(1, 1, 1)
+ :
CounterSnapshotMatcher.with().rows(1).frames(1);
+ CounterSnapshotMatcher inputCounterSnapshotMatcher = isSuperUser
+ ?
CounterSnapshotMatcher.with()
+
.rows(1, 1, 1)
+
.frames(1, 1, 1)
+ :
CounterSnapshotMatcher.with().rows(1).frames(1);
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("m1", ColumnType.FLOAT)
+ .build();
+
+ testIngestQuery().setSql(
+ " REPLACE INTO restrictedDatasource_m1_is_6 OVERWRITE
WHERE __time >= TIMESTAMP '2001-01-01' AND __time < TIMESTAMP '2001-01-04' "
+ + "SELECT __time, m1 "
+ + "FROM restrictedDatasource_m1_is_6 "
+ + "WHERE __time >= TIMESTAMP '2001-01-01' AND __time
< TIMESTAMP '2001-01-04' "
+ + "PARTITIONED by DAY ")
+ .setExpectedDataSource("restrictedDatasource_m1_is_6")
+
.setExpectedDestinationIntervals(ImmutableList.of(Intervals.of("2001-01-01T/P3D")))
+ .setExpectedTombstoneIntervals(expectedTombstoneIntervals)
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+ .setExpectedSegments(expectedSegments)
+ .setExpectedResultRows(expectedResultRows)
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().totalFiles(1),
+ 0, 0, "input0"
+ )
+
.setExpectedCountersForStageWorkerChannel(shuffleCounterSnapshotMatcher, 0, 0,
"shuffle")
+
.setExpectedCountersForStageWorkerChannel(inputCounterSnapshotMatcher, 1, 0,
"input0")
+
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+ CounterSnapshotMatcher
+
.with().segmentRowsProcessed(expectedSegments.size()),
+ 1, 0
+ )
+ .setExpectedLastCompactionState(
+ expectedCompactionState(
+ context,
+ Collections.emptyList(),
+ Collections.singletonList(new
FloatDimensionSchema("m1")),
+ GranularityType.DAY,
+ Intervals.of("2001-01-01T/P3D")
+ )
+ )
+ .verifyResults();
+ }
+
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceWithDynamicParameters(String contextName, Map<String,
Object> context)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index 130de0f60aa..6ab13132162 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -44,6 +44,7 @@ import
org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.msq.test.CounterSnapshotMatcher;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -52,6 +53,7 @@ import org.apache.druid.query.JoinAlgorithm;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.RestrictedDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
@@ -68,13 +70,13 @@ import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.ordering.StringComparators;
+import org.apache.druid.query.policy.Policy;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
-import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.filtration.Filtration;
@@ -135,7 +137,8 @@ public class MSQSelectTest extends MSQTestBase
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
{QUERY_RESULTS_WITH_DURABLE_STORAGE,
QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT},
- {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT}
+ {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT},
+ {SUPERUSER, SUPERUSER_MSQ_CONTEXT}
};
return Arrays.asList(data);
@@ -800,14 +803,37 @@ public class MSQSelectTest extends MSQTestBase
@ParameterizedTest(name = "{index}:with context {0}")
public void testSelectRestricted(String contextName, Map<String, Object>
context)
{
+ boolean isSuperUser =
context.get(MSQTaskQueryMaker.USER_KEY).equals(CalciteTests.TEST_SUPERUSER_NAME);
+ Policy policy = isSuperUser ? CalciteTests.POLICY_NO_RESTRICTION_SUPERUSER
: CalciteTests.POLICY_RESTRICTION;
+ long expectedResultRows = isSuperUser ? 6L : 1L;
+
+ final RowSignature rowSignature = RowSignature.builder().add("EXPR$0",
ColumnType.LONG).build();
+
testSelectQuery()
.setSql("select count(*) from druid.restrictedDatasource_m1_is_6")
.setQueryContext(context)
- .setExpectedExecutionErrorMatcher(CoreMatchers.allOf(
- CoreMatchers.instanceOf(ForbiddenException.class),
-
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("Unauthorized"))
- ))
- .verifyExecutionError();
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(
+ GroupByQuery.builder()
+ .setDataSource(RestrictedDataSource.create(
+
TableDataSource.create(CalciteTests.RESTRICTED_DATASOURCE),
+ policy
+ ))
+
.setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(aggregators(new
CountAggregatorFactory("a0")))
+ .setContext(context)
+ .build())
+ .columnMappings(new ColumnMappings(ImmutableList.of(new
ColumnMapping("a0", "EXPR$0"))))
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(isDurableStorageDestination(contextName,
context)
+ ? DurableStorageMSQDestination.INSTANCE
+ : TaskReportMSQDestination.INSTANCE)
+ .build())
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedResultRows(ImmutableList.of(new
Object[]{expectedResultRows}))
+ .verifyResults();
}
@MethodSource("data")
@@ -903,6 +929,71 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
+ @MethodSource("data")
+ @ParameterizedTest(name = "{index}:with context {0}")
+ public void testJoinRestrictedWithLookup(String contextName, Map<String,
Object> context)
+ {
+ boolean isSuperUser =
context.get(MSQTaskQueryMaker.USER_KEY).equals(CalciteTests.TEST_SUPERUSER_NAME);
+ Policy policy = isSuperUser ? CalciteTests.POLICY_NO_RESTRICTION_SUPERUSER
: CalciteTests.POLICY_RESTRICTION;
+ ImmutableList<Object[]> expectedResult = isSuperUser ?
ImmutableList.of(new Object[]{"xabc", 1L}) : ImmutableList.of();
+
+ final RowSignature rowSignature =
+ RowSignature.builder()
+ .add("v", ColumnType.STRING)
+ .add("cnt", ColumnType.LONG)
+ .build();
+
+ testSelectQuery()
+ .setSql("SELECT lookyloo.v, COUNT(*) AS cnt\n"
+ + "FROM druid.restrictedDatasource_m1_is_6 as restricted LEFT
JOIN lookup.lookyloo ON restricted.dim2 = lookyloo.k\n"
+ + "WHERE lookyloo.v <> 'xa'\n"
+ + "GROUP BY lookyloo.v")
+ .setQueryContext(context)
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(
+ GroupByQuery.builder()
+ .setDataSource(
+ join(
+ RestrictedDataSource.create(
+
TableDataSource.create(CalciteTests.RESTRICTED_DATASOURCE),
+ policy
+ ),
+ new LookupDataSource("lookyloo"),
+ "j0.",
+ equalsCondition(
+
DruidExpression.ofColumn(ColumnType.STRING, "dim2"),
+
DruidExpression.ofColumn(ColumnType.STRING, "j0.k")
+ ),
+ JoinType.INNER
+ )
+ )
+
.setInterval(querySegmentSpec(Filtration.eternity()))
+ .setDimFilter(not(equality("j0.v", "xa",
ColumnType.STRING)))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(new
DefaultDimensionSpec("j0.v", "d0")))
+ .setAggregatorSpecs(aggregators(new
CountAggregatorFactory("a0")))
+ .setContext(context)
+ .build())
+ .columnMappings(
+ new ColumnMappings(
+ ImmutableList.of(
+ new ColumnMapping("d0", "v"),
+ new ColumnMapping("a0", "cnt")
+ )
+ )
+ )
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(isDurableStorageDestination(contextName,
context)
+ ? DurableStorageMSQDestination.INSTANCE
+ : TaskReportMSQDestination.INSTANCE)
+ .build())
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedResultRows(expectedResult)
+
.setExpectedLookupLoadingSpec(LookupLoadingSpec.loadOnly(ImmutableSet.of("lookyloo")))
+ .verifyResults();
+ }
+
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testSubquery(String contextName, Map<String, Object> context)
diff --git
a/processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/RestrictedInputNumberDataSourceTest.java
similarity index 51%
copy from
processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java
copy to
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/RestrictedInputNumberDataSourceTest.java
index 2a51bc60053..da3307e181f 100644
---
a/processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/RestrictedInputNumberDataSourceTest.java
@@ -17,12 +17,13 @@
* under the License.
*/
-package org.apache.druid.query;
+package org.apache.druid.msq.querykit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
-import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.query.policy.NoRestrictionPolicy;
import org.apache.druid.query.policy.RowFilterPolicy;
@@ -30,40 +31,36 @@ import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Collections;
-
-public class RestrictedDataSourceTest
+public class RestrictedInputNumberDataSourceTest
{
- private final TableDataSource fooDataSource = new TableDataSource("foo");
- private final TableDataSource barDataSource = new TableDataSource("bar");
- private final RestrictedDataSource restrictedFooDataSource =
RestrictedDataSource.create(
- fooDataSource,
+ private final RestrictedInputNumberDataSource restrictedFooDataSource = new
RestrictedInputNumberDataSource(
+ 0,
RowFilterPolicy.from(TrueDimFilter.instance())
);
- private final RestrictedDataSource restrictedBarDataSource =
RestrictedDataSource.create(
- barDataSource,
+ private final RestrictedInputNumberDataSource restrictedBarDataSource = new
RestrictedInputNumberDataSource(
+ 1,
NoRestrictionPolicy.instance()
);
@Test
public void test_creation_failWithNullPolicy()
{
- IAE e = Assert.assertThrows(IAE.class, () ->
RestrictedDataSource.create(fooDataSource, null));
- Assert.assertEquals(e.getMessage(), "Policy can't be null for
RestrictedDataSource");
+ Exception e = Assert.assertThrows(Exception.class, () -> new
RestrictedInputNumberDataSource(1, null));
+ Assert.assertEquals(e.getMessage(), "Policy can't be null");
}
@Test
public void test_getTableNames()
{
- Assert.assertEquals(Collections.singleton("foo"),
restrictedFooDataSource.getTableNames());
- Assert.assertEquals(Collections.singleton("bar"),
restrictedBarDataSource.getTableNames());
+ Assert.assertTrue(restrictedFooDataSource.getTableNames().isEmpty());
+ Assert.assertTrue(restrictedBarDataSource.getTableNames().isEmpty());
}
@Test
public void test_getChildren()
{
- Assert.assertEquals(Collections.singletonList(fooDataSource),
restrictedFooDataSource.getChildren());
- Assert.assertEquals(Collections.singletonList(barDataSource),
restrictedBarDataSource.getChildren());
+ Assert.assertTrue(restrictedFooDataSource.getChildren().isEmpty());
+ Assert.assertTrue(restrictedBarDataSource.getChildren().isEmpty());
}
@Test
@@ -89,49 +86,38 @@ public class RestrictedDataSourceTest
{
IllegalArgumentException exception = Assert.assertThrows(
IllegalArgumentException.class,
- () -> restrictedFooDataSource.withChildren(Collections.emptyList())
+ () -> restrictedFooDataSource.withChildren(ImmutableList.of(new
TableDataSource("foo")))
);
- Assert.assertEquals(exception.getMessage(), "Expected [1] child, got [0]");
+ Assert.assertEquals(exception.getMessage(), "Cannot accept children");
- IllegalArgumentException exception2 = Assert.assertThrows(
- IllegalArgumentException.class,
- () ->
restrictedFooDataSource.withChildren(ImmutableList.of(fooDataSource,
barDataSource))
- );
- Assert.assertEquals(exception2.getMessage(), "Expected [1] child, got
[2]");
-
- RestrictedDataSource newRestrictedDataSource = (RestrictedDataSource)
restrictedFooDataSource.withChildren(
- ImmutableList.of(barDataSource));
- Assert.assertEquals(newRestrictedDataSource.getBase(), barDataSource);
+ RestrictedInputNumberDataSource newRestrictedDataSource =
(RestrictedInputNumberDataSource) restrictedFooDataSource.withChildren(
+ ImmutableList.of());
+ Assert.assertTrue(newRestrictedDataSource.getChildren().isEmpty());
}
@Test
public void test_withUpdatedDataSource()
{
- RestrictedDataSource newRestrictedDataSource = (RestrictedDataSource)
restrictedFooDataSource.withUpdatedDataSource(
- new TableDataSource("bar"));
- Assert.assertEquals(newRestrictedDataSource.getBase(), barDataSource);
- }
-
- @Test
- public void test_withAnalysis()
- {
- Assert.assertEquals(restrictedFooDataSource.getAnalysis(),
fooDataSource.getAnalysis());
- Assert.assertEquals(restrictedBarDataSource.getAnalysis(),
barDataSource.getAnalysis());
+ DataSource newRestrictedDataSource =
restrictedFooDataSource.withUpdatedDataSource(restrictedBarDataSource);
+ Assert.assertEquals(newRestrictedDataSource, restrictedBarDataSource);
}
@Test
public void test_equals()
{
-
EqualsVerifier.forClass(RestrictedDataSource.class).usingGetClass().withNonnullFields("base").verify();
+ EqualsVerifier.forClass(RestrictedInputNumberDataSource.class)
+ .usingGetClass()
+ .withNonnullFields("inputNumber", "policy")
+ .verify();
}
@Test
public void test_serde_roundTrip() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
- final RestrictedDataSource deserialized = (RestrictedDataSource)
jsonMapper.readValue(
+ final RestrictedInputNumberDataSource deserialized = jsonMapper.readValue(
jsonMapper.writeValueAsString(restrictedFooDataSource),
- DataSource.class
+ RestrictedInputNumberDataSource.class
);
Assert.assertEquals(restrictedFooDataSource, deserialized);
@@ -141,18 +127,17 @@ public class RestrictedDataSourceTest
public void test_deserialize_fromObject() throws Exception
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
- final RestrictedDataSource deserializedRestrictedDataSource =
jsonMapper.readValue(
-
"{\"type\":\"restrict\",\"base\":{\"type\":\"table\",\"name\":\"foo\"},\"policy\":{\"type\":\"noRestriction\"}}",
- RestrictedDataSource.class
+ final RestrictedInputNumberDataSource deserializedRestrictedDataSource =
jsonMapper.readValue(
+
"{\"type\":\"restrictedInputNumber\",\"inputNumber\":1,\"policy\":{\"type\":\"noRestriction\"}}\n",
+ RestrictedInputNumberDataSource.class
);
Assert.assertEquals(
deserializedRestrictedDataSource,
- RestrictedDataSource.create(fooDataSource,
NoRestrictionPolicy.instance())
+ restrictedBarDataSource
);
}
-
@Test
public void test_serialize() throws Exception
{
@@ -160,7 +145,7 @@ public class RestrictedDataSourceTest
final String s = jsonMapper.writeValueAsString(restrictedFooDataSource);
Assert.assertEquals(
-
"{\"type\":\"restrict\",\"base\":{\"type\":\"table\",\"name\":\"foo\"},\"policy\":{\"type\":\"row\",\"rowFilter\":{\"type\":\"true\"}}}",
+
"{\"type\":\"restrictedInputNumber\",\"inputNumber\":0,\"policy\":{\"type\":\"row\",\"rowFilter\":{\"type\":\"true\"}}}",
s
);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index ba7166f69a3..951e08d35a3 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -160,6 +160,7 @@ import
org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.sql.DirectStatement;
import org.apache.druid.sql.SqlQueryPlus;
@@ -235,6 +236,7 @@ import java.util.stream.Collectors;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
+import static
org.apache.druid.sql.calcite.util.CalciteTests.RESTRICTED_DATASOURCE;
import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2;
@@ -271,10 +273,16 @@ public class MSQTestBase extends BaseCalciteQueryTest
.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 2)
.put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, 0)
- .put(MSQTaskQueryMaker.USER_KEY, "allowAll")
+ .put(MSQTaskQueryMaker.USER_KEY,
CalciteTests.REGULAR_USER_AUTH_RESULT.getIdentity())
.put(MultiStageQueryContext.WINDOW_FUNCTION_OPERATOR_TRANSFORMATION, true)
.build();
+ public static final Map<String, Object> SUPERUSER_MSQ_CONTEXT =
+ ImmutableMap.<String, Object>builder()
+ .putAll(DEFAULT_MSQ_CONTEXT)
+ .put(MSQTaskQueryMaker.USER_KEY,
CalciteTests.SUPER_USER_AUTH_RESULT.getIdentity())
+ .buildKeepingLast();
+
public static final Map<String, Object> DURABLE_STORAGE_MSQ_CONTEXT =
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
@@ -316,6 +324,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
public static final String DURABLE_STORAGE = "durable_storage";
public static final String DEFAULT = "default";
public static final String PARALLEL_MERGE = "parallel_merge";
+ public static final String SUPERUSER = "superuser";
protected File localFileStorageDir;
protected LocalFileStorageConnector localFileStorageConnector;
@@ -647,6 +656,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
final QueryableIndex index;
switch (segmentId.getDataSource()) {
case DATASOURCE1:
+ case RESTRICTED_DATASOURCE: // RESTRICTED_DATASOURCE share the same
index as DATASOURCE1.
IncrementalIndexSchema foo1Schema = new
IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
@@ -788,14 +798,19 @@ public class MSQTestBase extends BaseCalciteQueryTest
);
}
- private String runMultiStageQuery(String query, Map<String, Object> context,
List<TypedValue> parameters)
+ private String runMultiStageQuery(
+ String query,
+ Map<String, Object> context,
+ AuthenticationResult authenticationResult,
+ List<TypedValue> parameters
+ )
{
final DirectStatement stmt = sqlStatementFactory.directStatement(
new SqlQueryPlus(
query,
context,
parameters,
- CalciteTests.REGULAR_USER_AUTH_RESULT
+ authenticationResult
)
);
@@ -888,6 +903,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
public abstract class MSQTester<Builder extends MSQTester<Builder>>
{
protected String sql = null;
+ protected AuthenticationResult authenticationResult =
CalciteTests.REGULAR_USER_AUTH_RESULT;
protected MSQControllerTask taskSpec = null;
protected Map<String, Object> queryContext = DEFAULT_MSQ_CONTEXT;
protected List<TypedValue> dynamicParameters = new ArrayList<>();
@@ -926,6 +942,10 @@ public class MSQTestBase extends BaseCalciteQueryTest
public Builder setQueryContext(Map<String, Object> queryContext)
{
this.queryContext = queryContext;
+ if (queryContext.containsKey(MSQTaskQueryMaker.USER_KEY)
+ &&
CalciteTests.TEST_SUPERUSER_NAME.equals(queryContext.get(MSQTaskQueryMaker.USER_KEY)))
{
+ this.authenticationResult = CalciteTests.SUPER_USER_AUTH_RESULT;
+ }
return asBuilder();
}
@@ -964,7 +984,6 @@ public class MSQTestBase extends BaseCalciteQueryTest
public Builder setExpectedTombstoneIntervals(Set<Interval>
tombstoneIntervals)
{
- Preconditions.checkArgument(!tombstoneIntervals.isEmpty(), "Segments
cannot be empty");
this.expectedTombstoneIntervals = tombstoneIntervals;
return asBuilder();
}
@@ -1068,7 +1087,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
final Throwable e = Assert.assertThrows(
Throwable.class,
- () -> runMultiStageQuery(sql, queryContext, dynamicParameters)
+ () -> runMultiStageQuery(sql, queryContext, authenticationResult,
dynamicParameters)
);
assertThat(e, expectedValidationErrorMatcher);
@@ -1220,7 +1239,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
String controllerId;
if (sql != null) {
// Run the sql command.
- controllerId = runMultiStageQuery(sql, queryContext,
dynamicParameters);
+ controllerId = runMultiStageQuery(sql, queryContext,
authenticationResult, dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
@@ -1437,7 +1456,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
try {
String controllerId;
if (sql != null) {
- controllerId = runMultiStageQuery(sql, queryContext,
dynamicParameters);
+ controllerId = runMultiStageQuery(sql, queryContext,
authenticationResult, dynamicParameters);
} else {
// Run the task spec directly instead.
controllerId = TEST_CONTROLLER_TASK_ID;
@@ -1479,7 +1498,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
Preconditions.checkArgument(sql == null || queryContext != null,
"queryContext cannot be null");
try {
- String controllerId = runMultiStageQuery(sql, queryContext,
dynamicParameters);
+ String controllerId = runMultiStageQuery(sql, queryContext,
authenticationResult, dynamicParameters);
if (expectedMSQFault != null || expectedMSQFaultClass != null) {
MSQErrorReport msqErrorReport = getErrorReportOrThrow(controllerId);
diff --git
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
index b4c81476cc4..e81a7a6acc9 100644
---
a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
+++
b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannels.java
@@ -19,6 +19,7 @@
package org.apache.druid.frame.processor;
+import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectSortedMap;
import it.unimi.dsi.fastutil.ints.IntSortedSet;
@@ -74,6 +75,26 @@ public class OutputChannels
return new
OutputChannels(outputChannels.stream().map(OutputChannel::readOnly).collect(Collectors.toList()));
}
+ /**
+ * Verifies there is exactly one channel per partition.
+ */
+ public OutputChannels verifySingleChannel()
+ {
+ for (int partitionNumber : getPartitionNumbers()) {
+ final List<OutputChannel> outputChannelsForPartition =
+ getChannelsForPartition(partitionNumber);
+
+ Preconditions.checkState(partitionNumber >= 0, "Expected partitionNumber
>= 0, but got [%s]", partitionNumber);
+ Preconditions.checkState(
+ outputChannelsForPartition.size() == 1,
+ "Expected one channel for partition [%s], but got [%s]",
+ partitionNumber,
+ outputChannelsForPartition.size()
+ );
+ }
+ return this;
+ }
+
/**
* Returns the set of partition numbers that appear across all channels.
*/
diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java
b/processing/src/main/java/org/apache/druid/query/DataSource.java
index 3f140ffc302..bf4966a4081 100644
--- a/processing/src/main/java/org/apache/druid/query/DataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/DataSource.java
@@ -109,7 +109,6 @@ public interface DataSource
* Returns a segment function on to how to segment should be modified.
*
* @param query the input query
- * @param cpuTimeAcc the cpu time accumulator
* @return the segment function
*/
Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query
query);
@@ -123,7 +122,7 @@ public interface DataSource
DataSource withUpdatedDataSource(DataSource newSource);
/**
- * Returns the query with an updated datasource based on the policy
restrictions on tables.
+ * Returns an updated datasource based on the policy restrictions on tables.
* <p>
* If this datasource contains no table, no changes should occur.
*
diff --git
a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
index f4ef4cc3fa8..a4679ce786a 100644
--- a/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/JoinDataSource.java
@@ -543,8 +543,7 @@ public class JoinDataSource implements DataSource
// can be refactored to omit these instanceof checks
while (current instanceof JoinDataSource
|| current instanceof UnnestDataSource
- || current instanceof FilteredDataSource
- || current instanceof RestrictedDataSource) {
+ || current instanceof FilteredDataSource) {
if (current instanceof JoinDataSource) {
final JoinDataSource joinDataSource = (JoinDataSource) current;
current = joinDataSource.getLeft();
@@ -561,9 +560,6 @@ public class JoinDataSource implements DataSource
} else if (current instanceof UnnestDataSource) {
final UnnestDataSource unnestDataSource = (UnnestDataSource) current;
current = unnestDataSource.getBase();
- } else if (current instanceof RestrictedDataSource) {
- final RestrictedDataSource restrictedDataSource =
(RestrictedDataSource) current;
- current = restrictedDataSource.getBase();
} else {
final FilteredDataSource filteredDataSource = (FilteredDataSource)
current;
current = filteredDataSource.getBase();
diff --git
a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
index 7f1303b4a47..13416b22d1f 100644
--- a/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
+++ b/processing/src/main/java/org/apache/druid/query/RestrictedDataSource.java
@@ -29,6 +29,7 @@ import org.apache.druid.query.policy.NoRestrictionPolicy;
import org.apache.druid.query.policy.Policy;
import org.apache.druid.segment.RestrictedSegment;
import org.apache.druid.segment.SegmentReference;
+
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -149,9 +150,13 @@ public class RestrictedDataSource implements DataSource
policy
);
}
- if (!(newPolicy.get() instanceof NoRestrictionPolicy)) {
+ if (newPolicy.get() instanceof NoRestrictionPolicy) {
+ // druid-internal calls with NoRestrictionPolicy: allow
+ } else if (newPolicy.get().equals(policy)) {
+ // same policy: allow
+ } else {
throw new ISE(
- "Multiple restrictions on table [%s]: policy [%s] and policy [%s]",
+ "Different restrictions on table [%s]: previous policy [%s] and new
policy [%s]",
base.getName(),
policy,
newPolicy.get()
@@ -179,8 +184,7 @@ public class RestrictedDataSource implements DataSource
@Override
public DataSourceAnalysis getAnalysis()
{
- final DataSource current = this.getBase();
- return current.getAnalysis();
+ return new DataSourceAnalysis(this, null, null, ImmutableList.of());
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
index 23f372c4390..73148641c8d 100644
---
a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
+++
b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java
@@ -24,6 +24,7 @@ import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
+import org.apache.druid.query.RestrictedDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.UnnestDataSource;
@@ -110,15 +111,21 @@ public class DataSourceAnalysis
}
/**
- * If {@link #getBaseDataSource()} is a {@link TableDataSource}, returns it.
Otherwise, returns an empty Optional.
- *
+ * Returns the concrete base table.
+ * <ul>
+ * <li>If {@link #baseDataSource} is a {@link TableDataSource}, returns
itself.
+ * <li>If {@link #baseDataSource} is a {@link RestrictedDataSource},
returns {@link RestrictedDataSource#getBase()}.
+ * <li>Otherwise, returns an empty Optional.
+ *</ul>
* Note that this can return empty even if {@link
#isConcreteAndTableBased()} is true. This happens if the base
- * datasource is a {@link UnionDataSource} of {@link TableDataSource}.
+ * datasource is a {@link UnionDataSource} or {@link UnnestDataSource}.
*/
public Optional<TableDataSource> getBaseTableDataSource()
{
if (baseDataSource instanceof TableDataSource) {
return Optional.of((TableDataSource) baseDataSource);
+ } else if (baseDataSource instanceof RestrictedDataSource) {
+ return Optional.of(((RestrictedDataSource) baseDataSource).getBase());
} else {
return Optional.empty();
}
@@ -218,6 +225,7 @@ public class DataSourceAnalysis
public boolean isTableBased()
{
return (baseDataSource instanceof TableDataSource
+ || baseDataSource instanceof RestrictedDataSource
|| (baseDataSource instanceof UnionDataSource &&
baseDataSource.getChildren()
.stream()
diff --git
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
index a22c7e92bfc..0940d299144 100644
---
a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
@@ -101,4 +101,28 @@ public class OutputChannelsTest
"Frame allocator is not available. The output channel might be
marked as read-only, hence memory allocator is not required."))
);
}
+
+ @Test
+ public void test_sanityCheck()
+ {
+ final OutputChannels channelsDuplicatedPartition =
OutputChannels.wrap(ImmutableList.of(
+ OutputChannel.nil(1),
+ OutputChannel.nil(1)
+ ));
+ final IllegalStateException e = Assert.assertThrows(
+ IllegalStateException.class,
+ channelsDuplicatedPartition::verifySingleChannel
+ );
+ Assert.assertEquals("Expected one channel for partition [1], but got [2]",
e.getMessage());
+
+ final OutputChannels channelsNegativePartition =
OutputChannels.wrap(ImmutableList.of(OutputChannel.nil(-1)));
+ final IllegalStateException e2 = Assert.assertThrows(
+ IllegalStateException.class,
+ channelsNegativePartition::verifySingleChannel
+ );
+ Assert.assertEquals("Expected partitionNumber >= 0, but got [-1]",
e2.getMessage());
+
+ final OutputChannels channels =
OutputChannels.wrap(ImmutableList.of(OutputChannel.nil(1),
OutputChannel.nil(2)));
+ Assert.assertEquals(channels, channels.verifySingleChannel());
+ }
}
diff --git
a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java
b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java
index cd77f14246f..25733b340f1 100644
--- a/processing/src/test/java/org/apache/druid/query/DataSourceTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DataSourceTest.java
@@ -178,6 +178,21 @@ public class DataSourceTest
Assert.assertEquals(restrictedDataSource,
restrictedDataSource.withPolicies(noRestrictionPolicy));
}
+ @Test
+ public void testMapWithRestriction_onRestrictedDataSource_samePolicy()
+ {
+ RestrictedDataSource restrictedDataSource = RestrictedDataSource.create(
+ TableDataSource.create("table1"),
+ RowFilterPolicy.from(new NullFilter("some-column", null))
+ );
+ ImmutableMap<String, Optional<Policy>> policyMap = ImmutableMap.of(
+ "table1",
+ Optional.of(RowFilterPolicy.from(new NullFilter("some-column", null)))
+ );
+
+ Assert.assertEquals(restrictedDataSource,
restrictedDataSource.withPolicies(policyMap));
+ }
+
@Test
public void testMapWithRestriction_onRestrictedDataSource_alwaysThrows()
{
@@ -194,7 +209,7 @@ public class DataSourceTest
ISE e = Assert.assertThrows(ISE.class, () ->
restrictedDataSource.withPolicies(anotherRestrictions));
Assert.assertEquals(
- "Multiple restrictions on table [table1]: policy
[RowFilterPolicy{rowFilter=random-column IS NULL}] and policy
[RowFilterPolicy{rowFilter=some-column IS NULL}]",
+ "Different restrictions on table [table1]: previous policy
[RowFilterPolicy{rowFilter=random-column IS NULL}] and new policy
[RowFilterPolicy{rowFilter=some-column IS NULL}]",
e.getMessage()
);
diff --git
a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java
b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java
index fa57ea19660..330a40363ae 100644
--- a/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java
+++ b/processing/src/test/java/org/apache/druid/query/JoinDataSourceTest.java
@@ -505,11 +505,12 @@ public class JoinDataSourceTest
@Test
public void testGetAnalysisWithRestrictedDS()
{
+ RestrictedDataSource left = RestrictedDataSource.create(
+ new TableDataSource("table1"),
+ NoRestrictionPolicy.instance()
+ );
JoinDataSource dataSource = JoinDataSource.create(
- RestrictedDataSource.create(
- new TableDataSource("table1"),
- NoRestrictionPolicy.instance()
- ),
+ left,
new TableDataSource("table2"),
"j.",
"x == \"j.x\"",
@@ -520,6 +521,7 @@ public class JoinDataSourceTest
JoinAlgorithm.BROADCAST
);
DataSourceAnalysis analysis = dataSource.getAnalysis();
+ Assert.assertEquals(left, analysis.getBaseDataSource());
Assert.assertEquals("table1",
analysis.getBaseDataSource().getTableNames().iterator().next());
}
diff --git
a/processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java
b/processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java
index 2a51bc60053..b5ef7c24ffe 100644
---
a/processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java
+++
b/processing/src/test/java/org/apache/druid/query/RestrictedDataSourceTest.java
@@ -112,17 +112,10 @@ public class RestrictedDataSourceTest
Assert.assertEquals(newRestrictedDataSource.getBase(), barDataSource);
}
- @Test
- public void test_withAnalysis()
- {
- Assert.assertEquals(restrictedFooDataSource.getAnalysis(),
fooDataSource.getAnalysis());
- Assert.assertEquals(restrictedBarDataSource.getAnalysis(),
barDataSource.getAnalysis());
- }
-
@Test
public void test_equals()
{
-
EqualsVerifier.forClass(RestrictedDataSource.class).usingGetClass().withNonnullFields("base").verify();
+
EqualsVerifier.forClass(RestrictedDataSource.class).usingGetClass().withNonnullFields("base",
"policy").verify();
}
@Test
diff --git
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
index 522e58daf1f..42661431ce9 100644
---
a/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
+++
b/processing/src/test/java/org/apache/druid/query/planning/DataSourceAnalysisTest.java
@@ -31,11 +31,13 @@ import org.apache.druid.query.JoinAlgorithm;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.RestrictedDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.TrueDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.policy.NoRestrictionPolicy;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@@ -54,6 +56,10 @@ public class DataSourceAnalysisTest
private static final List<Interval> MILLENIUM_INTERVALS =
ImmutableList.of(Intervals.of("2000/3000"));
private static final TableDataSource TABLE_FOO = new TableDataSource("foo");
private static final TableDataSource TABLE_BAR = new TableDataSource("bar");
+ private static final RestrictedDataSource RESTRICTED_FOO =
RestrictedDataSource.create(
+ TABLE_FOO,
+ NoRestrictionPolicy.instance()
+ );
private static final LookupDataSource LOOKUP_LOOKYLOO = new
LookupDataSource("lookyloo");
private static final InlineDataSource INLINE = InlineDataSource.fromIterable(
ImmutableList.of(new Object[0]),
@@ -79,6 +85,25 @@ public class DataSourceAnalysisTest
Assert.assertTrue(analysis.isBaseColumn("foo"));
}
+ @Test
+ public void testRestricted()
+ {
+ final DataSourceAnalysis analysis = RESTRICTED_FOO.getAnalysis();
+
+ Assert.assertTrue(analysis.isConcreteBased());
+ Assert.assertTrue(analysis.isTableBased());
+ Assert.assertTrue(analysis.isConcreteAndTableBased());
+ Assert.assertEquals(RESTRICTED_FOO, analysis.getBaseDataSource());
+ Assert.assertEquals(Optional.of(TABLE_FOO),
analysis.getBaseTableDataSource());
+ Assert.assertEquals(Optional.empty(), analysis.getBaseUnionDataSource());
+ Assert.assertEquals(Optional.empty(), analysis.getBaseQuery());
+ Assert.assertEquals(Optional.empty(), analysis.getBaseQuerySegmentSpec());
+ Assert.assertEquals(Collections.emptyList(),
analysis.getPreJoinableClauses());
+ Assert.assertFalse(analysis.isGlobal());
+ Assert.assertFalse(analysis.isJoin());
+ Assert.assertTrue(analysis.isBaseColumn("foo"));
+ }
+
@Test
public void testUnion()
{
diff --git
a/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
b/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
index d63a162d56d..e12ca682bb0 100644
--- a/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
@@ -307,13 +307,13 @@ public class QueryLifecycleTest
}
@Test
- public void testRunSimple_foundMultiplePolicyRestrictions()
+ public void testRunSimple_foundDifferentPolicyRestrictions()
{
// Multiple policy restrictions indicates most likely the system is trying
to double-authorizing the request
// This is not allowed in any case.
expectedException.expect(ISE.class);
expectedException.expectMessage(
- "Multiple restrictions on table [some_datasource]: policy
[RowFilterPolicy{rowFilter=some-column IS NULL}] and policy
[RowFilterPolicy{rowFilter=some-column2 IS NULL}]");
+ "Different restrictions on table [some_datasource]: previous policy
[RowFilterPolicy{rowFilter=some-column IS NULL}] and new policy
[RowFilterPolicy{rowFilter=some-column2 IS NULL}]");
DimFilter originalFilterOnRDS = new NullFilter("some-column", null);
Policy originalFilterPolicy = RowFilterPolicy.from(originalFilterOnRDS);
diff --git
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 1f0210422dd..50fc1f20f55 100644
---
a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++
b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -42,7 +42,6 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReferenceCountingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext.Keys;
import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.druid.query.planning.DataSourceAnalysis;
@@ -120,7 +119,7 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
throw new ISE("Cannot handle datasource: %s",
queryPlus.getQuery().getDataSource());
}
- final String dataSourceName = ((TableDataSource)
analysis.getBaseDataSource()).getName();
+ final String dataSourceName =
analysis.getBaseTableDataSource().get().getName();
FunctionalIterable<SegmentDescriptor> segmentDescriptors =
FunctionalIterable
.create(intervals)
@@ -146,7 +145,7 @@ public class TestClusterQuerySegmentWalker implements
QuerySegmentWalker
throw new ISE("Cannot handle datasource: %s", dataSourceFromQuery);
}
- final String dataSourceName = ((TableDataSource)
analysis.getBaseDataSource()).getName();
+ final String dataSourceName =
analysis.getBaseTableDataSource().get().getName();
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
index 280d2d85cd5..ddf84f21590 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
@@ -312,7 +312,8 @@ public class DruidQueryGenerator
source.rowSignature,
plannerContext,
rexBuilder,
- !(topLevel) && tweaks.finalizeSubQuery()
+ !(topLevel) && tweaks.finalizeSubQuery(),
+ false
);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java
index f3b45506a2f..37872ee302f 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidCorrelateUnnestRel.java
@@ -317,7 +317,8 @@ public class DruidCorrelateUnnestRel extends
DruidRel<DruidCorrelateUnnestRel>
getPlannerContext(),
getCluster().getRexBuilder(),
finalizeAggregations,
- null
+ null,
+ false
);
}
@@ -338,7 +339,8 @@ public class DruidCorrelateUnnestRel extends
DruidRel<DruidCorrelateUnnestRel>
),
getPlannerContext(),
getCluster().getRexBuilder(),
- false
+ false,
+ true
);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
index 312ba1885dd..4a1e3bf81da 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java
@@ -249,7 +249,8 @@ public class DruidJoinQueryRel extends
DruidRel<DruidJoinQueryRel>
getPlannerContext(),
getCluster().getRexBuilder(),
finalizeAggregations,
- sourceDesc.virtualColumnRegistry
+ sourceDesc.virtualColumnRegistry,
+ false
);
}
@@ -264,7 +265,8 @@ public class DruidJoinQueryRel extends
DruidRel<DruidJoinQueryRel>
),
getPlannerContext(),
getCluster().getRexBuilder(),
- false
+ false,
+ true
);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
index fd9bc03c0b7..3e1a28b7b15 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidOuterQueryRel.java
@@ -119,7 +119,8 @@ public class DruidOuterQueryRel extends
DruidRel<DruidOuterQueryRel>
sourceRowSignature,
getPlannerContext(),
getCluster().getRexBuilder(),
- finalizeAggregations
+ finalizeAggregations,
+ false
);
}
@@ -134,7 +135,8 @@ public class DruidOuterQueryRel extends
DruidRel<DruidOuterQueryRel>
),
getPlannerContext(),
getCluster().getRexBuilder(),
- false
+ false,
+ true
);
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 0330b9db909..b4cf04d4bc4 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -206,7 +206,8 @@ public class DruidQuery
final PlannerContext plannerContext,
final RexBuilder rexBuilder,
final boolean finalizeAggregations,
- @Nullable VirtualColumnRegistry virtualColumnRegistry
+ @Nullable VirtualColumnRegistry virtualColumnRegistry,
+ final boolean applyPolicies
)
{
final RelDataType outputRowType = partialQuery.leafRel().getRowType();
@@ -302,7 +303,7 @@ public class DruidQuery
}
return new DruidQuery(
- dataSource,
+ applyPolicies ? dataSource :
dataSource.withPolicies(plannerContext.getAuthorizationResult().getPolicyMap()),
plannerContext,
filter,
selectProjection,
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
index c9d3e4f7360..498c8402321 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQueryRel.java
@@ -131,7 +131,8 @@ public class DruidQueryRel extends DruidRel<DruidQueryRel>
druidTable.getRowSignature(),
getPlannerContext(),
getCluster().getRexBuilder(),
- finalizeAggregations
+ finalizeAggregations,
+ false
);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java
index c7e69cfe66d..f815779db16 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java
@@ -165,7 +165,8 @@ public class DruidUnionDataSourceRel extends
DruidRel<DruidUnionDataSourceRel>
signature,
getPlannerContext(),
getCluster().getRexBuilder(),
- finalizeAggregations
+ finalizeAggregations,
+ false
);
}
@@ -180,7 +181,8 @@ public class DruidUnionDataSourceRel extends
DruidRel<DruidUnionDataSourceRel>
),
getPlannerContext(),
getCluster().getRexBuilder(),
- false
+ false,
+ true
);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
index 171bdeaa69d..6ab804552e2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/PartialDruidQuery.java
@@ -519,7 +519,8 @@ public class PartialDruidQuery
final RowSignature sourceRowSignature,
final PlannerContext plannerContext,
final RexBuilder rexBuilder,
- final boolean finalizeAggregations
+ final boolean finalizeAggregations,
+ final boolean applyPolicies
)
{
return DruidQuery.fromPartialQuery(
@@ -529,7 +530,8 @@ public class PartialDruidQuery
plannerContext,
rexBuilder,
finalizeAggregations,
- null
+ null,
+ applyPolicies
);
}
@@ -539,7 +541,8 @@ public class PartialDruidQuery
final PlannerContext plannerContext,
final RexBuilder rexBuilder,
final boolean finalizeAggregations,
- @Nullable VirtualColumnRegistry virtualColumnRegistry
+ @Nullable VirtualColumnRegistry virtualColumnRegistry,
+ final boolean applyPolicies
)
{
return DruidQuery.fromPartialQuery(
@@ -549,7 +552,8 @@ public class PartialDruidQuery
plannerContext,
rexBuilder,
finalizeAggregations,
- virtualColumnRegistry
+ virtualColumnRegistry,
+ applyPolicies
);
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java
index f264fb75801..7f76b4d0140 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSelectQueryTest.java
@@ -33,6 +33,8 @@ import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Order;
import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.RestrictedDataSource;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
@@ -1165,7 +1167,10 @@ public class CalciteSelectQueryTest extends
BaseCalciteQueryTest
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.RESTRICTED_DATASOURCE)
+ .dataSource(RestrictedDataSource.create(
+
TableDataSource.create(CalciteTests.RESTRICTED_DATASOURCE),
+ CalciteTests.POLICY_NO_RESTRICTION_SUPERUSER
+ ))
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
@@ -1183,7 +1188,10 @@ public class CalciteSelectQueryTest extends
BaseCalciteQueryTest
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(
Druids.newTimeseriesQueryBuilder()
- .dataSource(CalciteTests.RESTRICTED_DATASOURCE)
+ .dataSource(RestrictedDataSource.create(
+
TableDataSource.create(CalciteTests.RESTRICTED_DATASOURCE),
+ CalciteTests.POLICY_RESTRICTION
+ ))
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 25396de322a..abf32c9bf32 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -147,10 +147,10 @@ public class CalciteTests
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> {
- boolean isRestrictedTable =
resource.getName().equals(RESTRICTED_DATASOURCE);
+ boolean readRestrictedTable =
resource.getName().equals(RESTRICTED_DATASOURCE) && action.equals(Action.READ);
if (TEST_SUPERUSER_NAME.equals(authenticationResult.getIdentity())) {
- return isRestrictedTable ?
Access.allowWithRestriction(POLICY_NO_RESTRICTION_SUPERUSER) : Access.OK;
+ return readRestrictedTable ?
Access.allowWithRestriction(POLICY_NO_RESTRICTION_SUPERUSER) : Access.OK;
}
switch (resource.getType()) {
@@ -159,7 +159,7 @@ public class CalciteTests
case FORBIDDEN_DATASOURCE:
return Access.DENIED;
default:
- return isRestrictedTable ?
Access.allowWithRestriction(POLICY_RESTRICTION) : Access.OK;
+ return readRestrictedTable ?
Access.allowWithRestriction(POLICY_RESTRICTION) : Access.OK;
}
case ResourceType.VIEW:
if ("forbiddenView".equals(resource.getName())) {
@@ -191,10 +191,10 @@ public class CalciteTests
public Authorizer getAuthorizer(String name)
{
return (authenticationResult, resource, action) -> {
- boolean isRestrictedTable =
resource.getName().equals(RESTRICTED_DATASOURCE);
+ boolean readRestrictedTable =
resource.getName().equals(RESTRICTED_DATASOURCE) && action.equals(Action.READ);
if (TEST_SUPERUSER_NAME.equals(authenticationResult.getIdentity())) {
- return isRestrictedTable ?
Access.allowWithRestriction(POLICY_NO_RESTRICTION_SUPERUSER) : Access.OK;
+ return readRestrictedTable ?
Access.allowWithRestriction(POLICY_NO_RESTRICTION_SUPERUSER) : Access.OK;
}
switch (resource.getType()) {
@@ -202,7 +202,7 @@ public class CalciteTests
if (FORBIDDEN_DATASOURCE.equals(resource.getName())) {
return Access.DENIED;
} else {
- return isRestrictedTable ?
Access.allowWithRestriction(POLICY_RESTRICTION) : Access.OK;
+ return readRestrictedTable ?
Access.allowWithRestriction(POLICY_RESTRICTION) : Access.OK;
}
case ResourceType.VIEW:
if ("forbiddenView".equals(resource.getName())) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]