This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 2e87b5a901 Input source security sql layer can handle input source
with multiple types (#14050)
2e87b5a901 is described below
commit 2e87b5a901f69029bd3eccb9c23ebd9fdbe01dec
Author: zachjsh <[email protected]>
AuthorDate: Mon Apr 10 09:48:57 2023 -0400
Input source security sql layer can handle input source with multiple types
(#14050)
### Description
This change allows for input sources used during MSQ ingestion to be
authorized for multiple input source types, instead of just 1. Such an input
source that allows for multiple types is the CombiningInputSource.
Also fixed bug that caused some input source specific functions to be
authorized against the permissions
`
[
new ResourceAction(new Resource(ResourceType.EXTERNAL,
ResourceType.EXTERNAL), Action.READ),
new ResourceAction(new Resource(ResourceType.EXTERNAL,
{input_source_type}), Action.READ)
]
`
when the inputSource based authorization feature is enabled, when it should
instead be authorized against
`
[
new ResourceAction(new Resource(ResourceType.EXTERNAL,
{input_source_type}), Action.READ)
]
`
---
.../catalog/model/table/S3InputSourceDefnTest.java | 22 ++---
.../catalog/model/table/BaseInputSourceDefn.java | 5 +-
.../catalog/model/table/ExternalTableSpec.java | 8 +-
.../model/table/FormattedInputSourceDefn.java | 3 +-
.../model/table/HttpInputSourceDefnTest.java | 10 +--
.../model/table/InlineInputSourceDefnTest.java | 6 +-
.../model/table/LocalInputSourceDefnTest.java | 2 +-
.../calcite/external/DruidExternTableMacro.java | 12 +--
.../external/ExternalOperatorConversion.java | 6 +-
.../druid/sql/calcite/external/Externals.java | 8 +-
.../external/SchemaAwareUserDefinedTableMacro.java | 13 +--
.../druid/sql/calcite/table/ExternalTable.java | 14 ++--
.../druid/sql/calcite/BaseCalciteQueryTest.java | 2 +-
.../druid/sql/calcite/CalciteIngestionDmlTest.java | 78 +++++++++++++++++-
.../druid/sql/calcite/CalciteInsertDmlTest.java | 93 ++++++++++++++++++++++
.../druid/sql/calcite/IngestTableFunctionTest.java | 60 ++++++++++++++
...mExternalWithoutSecuritySupport-logicalPlan.txt | 3 +
17 files changed, 293 insertions(+), 52 deletions(-)
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
index 759cbc6875..e0c1035881 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
@@ -345,7 +345,7 @@ public class S3InputSourceDefnTest
CatalogUtils.stringListToUriList(uris),
s3InputSource.getUris()
);
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
// But, it fails if there are no columns.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(),
mapper));
@@ -366,7 +366,7 @@ public class S3InputSourceDefnTest
CatalogUtils.stringListToUriList(uris),
s3InputSource.getUris()
);
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
@Test
@@ -399,7 +399,7 @@ public class S3InputSourceDefnTest
s3InputSource.getUris()
);
assertEquals("*.csv", s3InputSource.getObjectGlob());
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
@Test
@@ -419,7 +419,7 @@ public class S3InputSourceDefnTest
CatalogUtils.stringListToUriList(prefixes),
s3InputSource.getPrefixes()
);
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
// But, it fails if there are no columns.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(),
mapper));
@@ -442,7 +442,7 @@ public class S3InputSourceDefnTest
CatalogUtils.stringListToUriList(prefixes),
s3InputSource.getPrefixes()
);
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
@Test
@@ -462,7 +462,7 @@ public class S3InputSourceDefnTest
CloudObjectLocation obj = s3InputSource.getObjects().get(0);
assertEquals("foo.com", obj.getBucket());
assertEquals("bar/file.csv", obj.getPath());
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
// But, it fails if there are no columns.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(),
mapper));
@@ -510,7 +510,7 @@ public class S3InputSourceDefnTest
obj = s3InputSource.getObjects().get(1);
assertEquals("foo.com", obj.getBucket());
assertEquals("mumble/file2.csv", obj.getPath());
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
@Test
@@ -578,7 +578,7 @@ public class S3InputSourceDefnTest
ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
ExternalTableSpec externSpec = externDefn.convert(resolved);
assertEquals(s3InputSource, externSpec.inputSource);
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
// Get the partial table function
TableFunction fn = externDefn.tableFn(resolved);
@@ -613,7 +613,7 @@ public class S3InputSourceDefnTest
ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
ExternalTableSpec externSpec = externDefn.convert(resolved);
assertEquals(s3InputSource, externSpec.inputSource);
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
// Get the partial table function
TableFunction fn = externDefn.tableFn(resolved);
@@ -672,7 +672,7 @@ public class S3InputSourceDefnTest
CloudObjectLocation obj = s3InputSource.getObjects().get(0);
assertEquals("foo.com", obj.getBucket());
assertEquals("bar/file.csv", obj.getPath());
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
// But, it fails columns are provided since the table already has them.
assertThrows(IAE.class, () -> fn.apply("x", args, COLUMNS, mapper));
@@ -714,7 +714,7 @@ public class S3InputSourceDefnTest
assertEquals("foo.com", obj.getBucket());
assertEquals("bar/file.csv", obj.getPath());
assertTrue(externSpec.inputFormat instanceof CsvInputFormat);
- assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
// But, it fails columns are not provided since the table does not have
them.
assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(),
mapper));
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
index 83a085ead4..660e73c352 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.utils.CollectionUtils;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -152,7 +153,7 @@ public abstract class BaseInputSourceDefn implements
InputSourceDefn
convertArgsToSource(args, jsonMapper),
convertArgsToFormat(args, columns, jsonMapper),
Columns.convertSignature(columns),
- typeValue()
+ () -> Collections.singleton(typeValue())
);
}
@@ -209,7 +210,7 @@ public abstract class BaseInputSourceDefn implements
InputSourceDefn
convertTableToSource(table),
convertTableToFormat(table),
Columns.convertSignature(table.resolvedTable().spec().columns()),
- typeValue()
+ () -> Collections.singleton(typeValue())
);
}
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
index a132ff6728..7ea408004d 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
@@ -19,11 +19,13 @@
package org.apache.druid.catalog.model.table;
+import com.google.common.base.Supplier;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
+import java.util.Set;
/**
* Catalog form of an external table specification used to pass along the three
@@ -36,17 +38,17 @@ public class ExternalTableSpec
public final InputSource inputSource;
public final InputFormat inputFormat;
@Nullable public final RowSignature signature;
- public final String inputSourceType;
+ public final Supplier<Set<String>> inputSourceTypesSupplier;
public ExternalTableSpec(
final InputSource inputSource,
final InputFormat inputFormat,
final RowSignature signature,
- final String inputSourceType)
+ final Supplier<Set<String>> inputSourceTypesSupplier)
{
this.inputSource = inputSource;
this.inputFormat = inputFormat;
this.signature = signature;
- this.inputSourceType = inputSourceType;
+ this.inputSourceTypesSupplier = inputSourceTypesSupplier;
}
}
diff --git
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
index bf0a904bca..a00e71b891 100644
---
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
+++
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.utils.CollectionUtils;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -194,7 +195,7 @@ public abstract class FormattedInputSourceDefn extends
BaseInputSourceDefn
convertSource(sourceMap, jsonMapper),
inputFormat,
Columns.convertSignature(completedCols),
- typeValue()
+ () -> Collections.singleton(typeValue())
);
}
}
diff --git
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 21e6f14eac..67ceedf303 100644
---
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -398,7 +398,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv",
"http://foo.com/bar.csv")),
sourceSpec.getUris()
);
- assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
@Test
@@ -442,7 +442,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/my.csv",
"http://foo.com/bar.csv")),
sourceSpec.getUris()
);
- assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
@Test
@@ -466,7 +466,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv",
"http://foo.com/bar.csv")),
sourceSpec.getUris()
);
- assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
@Test
@@ -499,7 +499,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
"SECRET",
((EnvironmentVariablePasswordProvider)
sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable()
);
- assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
private void validateHappyPath(ExternalTableSpec externSpec, boolean
withUser)
@@ -519,7 +519,7 @@ public class HttpInputSourceDefnTest extends
BaseExternTableTest
assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
- assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
private Map<String, Object> httpToMap(HttpInputSource source)
diff --git
a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
index daae0bccd0..448e353664 100644
---
a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
+++
b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
@@ -144,7 +144,7 @@ public class InlineInputSourceDefnTest extends
BaseExternTableTest
CsvInputFormat format = (CsvInputFormat) extern.inputFormat;
assertEquals(Arrays.asList("a", "b"), format.getColumns());
assertEquals(2, extern.signature.size());
- assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
+ assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY),
extern.inputSourceTypesSupplier.get());
// Fails if no columns are provided.
assertThrows(IAE.class, () -> fn.apply("x", new HashMap<>(),
Collections.emptyList(), mapper));
@@ -179,7 +179,7 @@ public class InlineInputSourceDefnTest extends
BaseExternTableTest
CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
assertEquals(2, extern.signature.size());
- assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
+ assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY),
extern.inputSourceTypesSupplier.get());
// Cannot supply columns with the function
List<ColumnSpec> columns = Arrays.asList(
@@ -215,6 +215,6 @@ public class InlineInputSourceDefnTest extends
BaseExternTableTest
CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
assertEquals(2, extern.signature.size());
- assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
+ assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY),
extern.inputSourceTypesSupplier.get());
}
}
diff --git
a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
index a71a00f4b9..1bbc74513b 100644
---
a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
+++
b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
@@ -536,6 +536,6 @@ public class LocalInputSourceDefnTest extends
BaseExternTableTest
assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
- assertEquals(LocalInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+ assertEquals(Collections.singleton(LocalInputSourceDefn.TYPE_KEY),
externSpec.inputSourceTypesSupplier.get());
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
index e8740ba5bb..e74a0fdd86 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
@@ -20,11 +20,11 @@
package org.apache.druid.sql.calcite.external;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.NlsString;
+import org.apache.druid.data.input.InputSource;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
@@ -34,6 +34,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Used by {@link ExternalOperatorConversion} to generate a {@link DruidTable}
@@ -55,11 +56,10 @@ public class DruidExternTableMacro extends
DruidUserDefinedTableMacro
String inputSourceStr = getInputSourceArgument(call);
try {
- JsonNode jsonNode = ((DruidTableMacro)
macro).getJsonMapper().readTree(inputSourceStr);
- return Collections.singleton(new ResourceAction(new Resource(
- ResourceType.EXTERNAL,
- jsonNode.get("type").asText()
- ), Action.READ));
+ InputSource inputSource = ((DruidTableMacro)
macro).getJsonMapper().readValue(inputSourceStr, InputSource.class);
+ return inputSource.getTypes().stream()
+ .map(inputSourceType -> new ResourceAction(new
Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ))
+ .collect(Collectors.toSet());
}
catch (JsonProcessingException e) {
// this shouldn't happen, the input source paraemeter should have been
validated before this
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
index 6b1b1fb624..f2f667af4e 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
@@ -111,12 +111,12 @@ public class ExternalOperatorConversion extends
DruidExternTableMacroConversion
}
String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
- String inputSrcType =
jsonMapper.readTree(inputSrcStr).get("type").asText();
+ InputSource inputSource = jsonMapper.readValue(inputSrcStr,
InputSource.class);
return new ExternalTableSpec(
- jsonMapper.readValue(inputSrcStr, InputSource.class),
+ inputSource,
jsonMapper.readValue(CatalogUtils.getString(args,
INPUT_FORMAT_PARAM), InputFormat.class),
rowSignature,
- inputSrcType
+ inputSource::getTypes
);
}
catch (JsonProcessingException e) {
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
index 595c825718..ee0c650242 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.external;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.avatica.SqlType;
import org.apache.calcite.rel.type.RelDataType;
@@ -55,6 +56,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -297,7 +299,7 @@ public class Externals
+ "Please change the column name to something other than
__time");
}
- return toExternalTable(spec, jsonMapper, spec.inputSourceType);
+ return toExternalTable(spec, jsonMapper, spec.inputSourceTypesSupplier);
}
public static ResourceAction externalRead(String name)
@@ -308,7 +310,7 @@ public class Externals
public static ExternalTable toExternalTable(
ExternalTableSpec spec,
ObjectMapper jsonMapper,
- String inputSourceType
+ Supplier<Set<String>> inputSourceTypesSupplier
)
{
return new ExternalTable(
@@ -319,7 +321,7 @@ public class Externals
),
spec.signature,
jsonMapper,
- inputSourceType
+ inputSourceTypesSupplier
);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
index b05c0a0cd8..28c6236c09 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
@@ -48,6 +48,7 @@ import org.apache.druid.sql.calcite.table.ExternalTable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Table macro designed for use with the Druid EXTEND operator. Example:
@@ -172,12 +173,14 @@ public abstract class SchemaAwareUserDefinedTableMacro
{
Set<ResourceAction> resourceActions = new HashSet<>();
if (table instanceof ExternalTable && inputSourceTypeSecurityEnabled) {
- resourceActions.add(new ResourceAction(new Resource(
- ResourceType.EXTERNAL,
- ((ExternalTable) table).getInputSourceType()
- ), Action.READ));
+ resourceActions.addAll(((ExternalTable) table)
+ .getInputSourceTypeSupplier().get().stream()
+ .map(inputSourceType ->
+ new ResourceAction(new
Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ))
+ .collect(Collectors.toSet()));
+ } else {
+ resourceActions.addAll(base.computeResources(call,
inputSourceTypeSecurityEnabled));
}
- resourceActions.addAll(base.computeResources(call,
inputSourceTypeSecurityEnabled));
return resourceActions;
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
index 3c6fbdd0ac..bda5b43625 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.table;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelOptTable.ToRelContext;
import org.apache.calcite.rel.RelNode;
@@ -30,6 +31,8 @@ import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.external.ExternalTableScan;
+import java.util.Set;
+
/**
* Represents an source of data external to Druid: a CSV file, an HTTP
request, etc.
* Each such table represents one of Druid's {@link DataSource} types. Since
SQL
@@ -42,7 +45,7 @@ public class ExternalTable extends DruidTable
private final DataSource dataSource;
private final ObjectMapper objectMapper;
- private final String inputSourceType;
+ private final Supplier<Set<String>> inputSourceTypeSupplier;
/**
* Cached row type, to avoid recreating types multiple times.
@@ -53,13 +56,13 @@ public class ExternalTable extends DruidTable
final DataSource dataSource,
final RowSignature rowSignature,
final ObjectMapper objectMapper,
- final String inputSourceType
+ final Supplier<Set<String>> inputSourceTypesSupplier
)
{
super(rowSignature);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.objectMapper = objectMapper;
- this.inputSourceType = inputSourceType;
+ this.inputSourceTypeSupplier = inputSourceTypesSupplier;
}
@Override
@@ -93,9 +96,9 @@ public class ExternalTable extends DruidTable
return rowType;
}
- public String getInputSourceType()
+ public Supplier<Set<String>> getInputSourceTypeSupplier()
{
- return inputSourceType;
+ return inputSourceTypeSupplier;
}
@Override
@@ -110,7 +113,6 @@ public class ExternalTable extends DruidTable
return "ExternalTable{" +
"dataSource=" + dataSource +
", rowSignature=" + getRowSignature() +
- ", inputSourceType=" + getInputSourceType() +
'}';
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index ea85780bd7..43eb443045 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -992,7 +992,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
* factory is specific to one test and one planner config. This method can be
* overridden to control the objects passed to the factory.
*/
- private SqlStatementFactory getSqlStatementFactory(
+ SqlStatementFactory getSqlStatementFactory(
PlannerConfig plannerConfig,
AuthConfig authConfig
)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
index 50d0bc445b..75d3fa2432 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.sql.calcite;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -27,8 +29,13 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
+import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.ISE;
@@ -62,10 +69,15 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.internal.matchers.ThrowableMessageMatcher;
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
{
@@ -143,7 +155,9 @@ public class CalciteIngestionDmlTest extends
BaseCalciteQueryTest
public List<? extends Module> getJacksonModules()
{
// We want this module to bring input sources along for the ride.
- return new InputSourceModule().getJacksonModules();
+ List<Module> modules = new ArrayList<>(new
InputSourceModule().getJacksonModules());
+ modules.add(new
SimpleModule("test-module").registerSubtypes(TestFileInputSource.class));
+ return modules;
}
@Override
@@ -365,7 +379,7 @@ public class CalciteIngestionDmlTest extends
BaseCalciteQueryTest
final Throwable e = Assert.assertThrows(
Throwable.class,
() -> {
-
getSqlStatementFactory(plannerConfig).directStatement(sqlQuery()).execute();
+ getSqlStatementFactory(plannerConfig,
authConfig).directStatement(sqlQuery()).execute();
}
);
@@ -423,4 +437,64 @@ public class CalciteIngestionDmlTest extends
BaseCalciteQueryTest
.build();
}
}
+
+ static class TestFileInputSource extends AbstractInputSource implements
SplittableInputSource<File>
+ {
+ private final List<File> files;
+
+ @JsonCreator
+ TestFileInputSource(@JsonProperty("files") List<File> fileList)
+ {
+ files = fileList;
+ }
+
+ @JsonProperty
+ public List<File> getFiles()
+ {
+ return files;
+ }
+
+ @Override
+ public Stream<InputSplit<File>> createSplits(InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec)
+ {
+ return files.stream().map(InputSplit::new);
+ }
+
+ @Override
+ public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
+ {
+ return files.size();
+ }
+
+ @Override
+ public SplittableInputSource<File> withSplit(InputSplit<File> split)
+ {
+ return new TestFileInputSource(ImmutableList.of(split.get()));
+ }
+
+ @Override
+ public boolean needsFormat()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestFileInputSource that = (TestFileInputSource) o;
+ return Objects.equals(files, that.files);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(files);
+ }
+ }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index d9b6b6e32f..fb184e89c8 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.java.util.common.StringUtils;
@@ -55,6 +56,7 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
+import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -473,6 +475,97 @@ public class CalciteInsertDmlTest extends
CalciteIngestionDmlTest
.verify();
}
+ @Test
+ public void testInsertFromExternalWithoutSecuritySupport()
+ {
+ InputSource inputSource =
+ new TestFileInputSource(ImmutableList.of(new
File("/tmp/foo.csv").getAbsoluteFile()));
+ final ExternalDataSource externalDataSource = new ExternalDataSource(
+ inputSource,
+ new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false,
false, 0),
+ RowSignature.builder()
+ .add("x", ColumnType.STRING)
+ .add("y", ColumnType.STRING)
+ .add("z", ColumnType.LONG)
+ .build()
+ );
+ String extern;
+ ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+ try {
+ extern = StringUtils.format(
+ "TABLE(extern("
+ + "inputSource => '%s',"
+ + "inputFormat => '%s'))",
+ queryJsonMapper.writeValueAsString(inputSource),
+ queryJsonMapper.writeValueAsString(
+ new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false,
false, 0)
+ )
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ testIngestionQuery()
+ .sql("INSERT INTO dst SELECT * FROM %s\n" +
+ " (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+ "PARTITIONED BY ALL TIME",
+ extern
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+
.authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(false).build())
+ .expectTarget("dst", externalDataSource.getSignature())
+ .expectResources(dataSourceWrite("dst"),
Externals.EXTERNAL_RESOURCE_ACTION)
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(externalDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("x", "y", "z")
+ .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .expectLogicalPlanFrom("InsertFromExternalWithoutSecuritySupport")
+ .verify();
+ }
+
+ @Test
+ public void
testInsertFromExternalWithoutSecuritySupportWithInputsourceSecurityEnabled()
+ {
+ String extern;
+ ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+ try {
+ extern = StringUtils.format(
+ "TABLE(extern("
+ + "inputSource => '%s',"
+ + "inputFormat => '%s'))",
+ queryJsonMapper.writeValueAsString(
+ new TestFileInputSource(ImmutableList.of(new
File("/tmp/foo.csv").getAbsoluteFile()))),
+ queryJsonMapper.writeValueAsString(
+ new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false,
false, 0)
+ )
+ );
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ testIngestionQuery()
+ .sql("INSERT INTO dst SELECT * FROM %s\n" +
+ " (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+ "PARTITIONED BY ALL TIME",
+ extern
+ )
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+
.authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+ .expectLogicalPlanFrom("insertFromExternal")
+ .expectValidationError(
+ CoreMatchers.allOf(
+ CoreMatchers.instanceOf(SqlPlanningException.class),
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+ "org.apache.druid.java.util.common.UOE: This inputSource
does not support input source based security"))
+ )
+ )
+ .verify();
+ }
+
@Test
public void testInsertWithPartitionedBy()
{
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
index 83bb87436b..86136c7000 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.Externals;
@@ -110,6 +111,65 @@ public class IngestTableFunctionTest extends
CalciteIngestionDmlTest
.verify();
}
+ /**
+ * Http function
+ */
+ @Test
+ public void testHttpFunction()
+ {
+ String extern = "TABLE(http("
+ + "userName => 'bob',"
+ + "password => 'secret',"
+ + "uris => ARRAY['http://foo.com/bar.csv'],"
+ + "format => 'csv'))"
+ + " (x VARCHAR, y VARCHAR, z BIGINT)";
+ testIngestionQuery()
+ .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME",
extern)
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("dst", httpDataSource.getSignature())
+ .expectResources(dataSourceWrite("dst"),
Externals.EXTERNAL_RESOURCE_ACTION)
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(httpDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("x", "y", "z")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .expectLogicalPlanFrom("httpExtern")
+ .verify();
+ }
+
+ /**
+ * Http function
+ */
+ @Test
+ public void testHttpFunctionWithInputsourceSecurity()
+ {
+ String extern = "TABLE(http("
+ + "userName => 'bob',"
+ + "password => 'secret',"
+ + "uris => ARRAY['http://foo.com/bar.csv'],"
+ + "format => 'csv'))"
+ + " (x VARCHAR, y VARCHAR, z BIGINT)";
+ testIngestionQuery()
+ .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME",
extern)
+
.authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+ .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+ .expectTarget("dst", httpDataSource.getSignature())
+ .expectResources(dataSourceWrite("dst"), externalRead("http"))
+ .expectQuery(
+ newScanQueryBuilder()
+ .dataSource(httpDataSource)
+ .intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("x", "y", "z")
+
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+ .build()
+ )
+ .expectLogicalPlanFrom("httpExtern")
+ .verify();
+ }
+
protected String externSqlByName(final ExternalDataSource externalDataSource)
{
ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
diff --git
a/sql/src/test/resources/calcite/expected/ingest/InsertFromExternalWithoutSecuritySupport-logicalPlan.txt
b/sql/src/test/resources/calcite/expected/ingest/InsertFromExternalWithoutSecuritySupport-logicalPlan.txt
new file mode 100644
index 0000000000..272f660d21
--- /dev/null
+++
b/sql/src/test/resources/calcite/expected/ingest/InsertFromExternalWithoutSecuritySupport-logicalPlan.txt
@@ -0,0 +1,3 @@
+LogicalInsert(target=[dst], partitionedBy=[AllGranularity],
clusteredBy=[<none>])
+ LogicalProject(x=[$0], y=[$1], z=[$2])
+
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"CalciteIngestionDmlTest$TestFileInputSource","files":["/tmp/foo.csv"]},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]