This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e87b5a901 Input source security sql layer can handle input source 
with multiple types (#14050)
2e87b5a901 is described below

commit 2e87b5a901f69029bd3eccb9c23ebd9fdbe01dec
Author: zachjsh <[email protected]>
AuthorDate: Mon Apr 10 09:48:57 2023 -0400

    Input source security sql layer can handle input source with multiple types 
(#14050)
    
    ### Description
    
    This change allows for input sources used during MSQ ingestion to be 
authorized for multiple input source types, instead of just 1. Such an input 
source that allows for multiple types is the CombiningInputSource.
    
    Also fixed bug that caused some input source specific functions to be 
authorized against the permissions
    
    `
    [
        new ResourceAction(new Resource(ResourceType.EXTERNAL, 
ResourceType.EXTERNAL), Action.READ),
        new ResourceAction(new Resource(ResourceType.EXTERNAL, 
{input_source_type}), Action.READ)
    ]
    `
    
    when the inputSource based authorization feature is enabled, when it should 
instead be authorized against
    
    `
    [
        new ResourceAction(new Resource(ResourceType.EXTERNAL, 
{input_source_type}), Action.READ)
    ]
    `
---
 .../catalog/model/table/S3InputSourceDefnTest.java | 22 ++---
 .../catalog/model/table/BaseInputSourceDefn.java   |  5 +-
 .../catalog/model/table/ExternalTableSpec.java     |  8 +-
 .../model/table/FormattedInputSourceDefn.java      |  3 +-
 .../model/table/HttpInputSourceDefnTest.java       | 10 +--
 .../model/table/InlineInputSourceDefnTest.java     |  6 +-
 .../model/table/LocalInputSourceDefnTest.java      |  2 +-
 .../calcite/external/DruidExternTableMacro.java    | 12 +--
 .../external/ExternalOperatorConversion.java       |  6 +-
 .../druid/sql/calcite/external/Externals.java      |  8 +-
 .../external/SchemaAwareUserDefinedTableMacro.java | 13 +--
 .../druid/sql/calcite/table/ExternalTable.java     | 14 ++--
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |  2 +-
 .../druid/sql/calcite/CalciteIngestionDmlTest.java | 78 +++++++++++++++++-
 .../druid/sql/calcite/CalciteInsertDmlTest.java    | 93 ++++++++++++++++++++++
 .../druid/sql/calcite/IngestTableFunctionTest.java | 60 ++++++++++++++
 ...mExternalWithoutSecuritySupport-logicalPlan.txt |  3 +
 17 files changed, 293 insertions(+), 52 deletions(-)

diff --git 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
index 759cbc6875..e0c1035881 100644
--- 
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
+++ 
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/catalog/model/table/S3InputSourceDefnTest.java
@@ -345,7 +345,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(uris),
         s3InputSource.getUris()
     );
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
 
     // But, it fails if there are no columns.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), 
mapper));
@@ -366,7 +366,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(uris),
         s3InputSource.getUris()
     );
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   @Test
@@ -399,7 +399,7 @@ public class S3InputSourceDefnTest
         s3InputSource.getUris()
     );
     assertEquals("*.csv", s3InputSource.getObjectGlob());
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   @Test
@@ -419,7 +419,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(prefixes),
         s3InputSource.getPrefixes()
     );
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
 
     // But, it fails if there are no columns.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), 
mapper));
@@ -442,7 +442,7 @@ public class S3InputSourceDefnTest
         CatalogUtils.stringListToUriList(prefixes),
         s3InputSource.getPrefixes()
     );
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   @Test
@@ -462,7 +462,7 @@ public class S3InputSourceDefnTest
     CloudObjectLocation obj = s3InputSource.getObjects().get(0);
     assertEquals("foo.com", obj.getBucket());
     assertEquals("bar/file.csv", obj.getPath());
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
 
     // But, it fails if there are no columns.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), 
mapper));
@@ -510,7 +510,7 @@ public class S3InputSourceDefnTest
     obj = s3InputSource.getObjects().get(1);
     assertEquals("foo.com", obj.getBucket());
     assertEquals("mumble/file2.csv", obj.getPath());
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   @Test
@@ -578,7 +578,7 @@ public class S3InputSourceDefnTest
     ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
     ExternalTableSpec externSpec = externDefn.convert(resolved);
     assertEquals(s3InputSource, externSpec.inputSource);
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
 
     // Get the partial table function
     TableFunction fn = externDefn.tableFn(resolved);
@@ -613,7 +613,7 @@ public class S3InputSourceDefnTest
     ExternalTableDefn externDefn = (ExternalTableDefn) resolved.defn();
     ExternalTableSpec externSpec = externDefn.convert(resolved);
     assertEquals(s3InputSource, externSpec.inputSource);
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
 
     // Get the partial table function
     TableFunction fn = externDefn.tableFn(resolved);
@@ -672,7 +672,7 @@ public class S3InputSourceDefnTest
     CloudObjectLocation obj = s3InputSource.getObjects().get(0);
     assertEquals("foo.com", obj.getBucket());
     assertEquals("bar/file.csv", obj.getPath());
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
 
     // But, it fails columns are provided since the table already has them.
     assertThrows(IAE.class, () -> fn.apply("x", args, COLUMNS, mapper));
@@ -714,7 +714,7 @@ public class S3InputSourceDefnTest
     assertEquals("foo.com", obj.getBucket());
     assertEquals("bar/file.csv", obj.getPath());
     assertTrue(externSpec.inputFormat instanceof CsvInputFormat);
-    assertEquals(S3InputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(S3InputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
 
     // But, it fails columns are not provided since the table does not have 
them.
     assertThrows(IAE.class, () -> fn.apply("x", args, Collections.emptyList(), 
mapper));
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
index 83a085ead4..660e73c352 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/BaseInputSourceDefn.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.utils.CollectionUtils;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -152,7 +153,7 @@ public abstract class BaseInputSourceDefn implements 
InputSourceDefn
         convertArgsToSource(args, jsonMapper),
         convertArgsToFormat(args, columns, jsonMapper),
         Columns.convertSignature(columns),
-        typeValue()
+        () -> Collections.singleton(typeValue())
     );
   }
 
@@ -209,7 +210,7 @@ public abstract class BaseInputSourceDefn implements 
InputSourceDefn
         convertTableToSource(table),
         convertTableToFormat(table),
         Columns.convertSignature(table.resolvedTable().spec().columns()),
-        typeValue()
+        () -> Collections.singleton(typeValue())
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
index a132ff6728..7ea408004d 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/ExternalTableSpec.java
@@ -19,11 +19,13 @@
 
 package org.apache.druid.catalog.model.table;
 
+import com.google.common.base.Supplier;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
+import java.util.Set;
 
 /**
  * Catalog form of an external table specification used to pass along the three
@@ -36,17 +38,17 @@ public class ExternalTableSpec
   public final InputSource inputSource;
   public final InputFormat inputFormat;
   @Nullable public final RowSignature signature;
-  public final String inputSourceType;
+  public final Supplier<Set<String>> inputSourceTypesSupplier;
 
   public ExternalTableSpec(
       final InputSource inputSource,
       final InputFormat inputFormat,
       final RowSignature signature,
-      final String inputSourceType)
+      final Supplier<Set<String>> inputSourceTypesSupplier)
   {
     this.inputSource = inputSource;
     this.inputFormat = inputFormat;
     this.signature = signature;
-    this.inputSourceType = inputSourceType;
+    this.inputSourceTypesSupplier = inputSourceTypesSupplier;
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
 
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
index bf0a904bca..a00e71b891 100644
--- 
a/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
+++ 
b/server/src/main/java/org/apache/druid/catalog/model/table/FormattedInputSourceDefn.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.utils.CollectionUtils;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -194,7 +195,7 @@ public abstract class FormattedInputSourceDefn extends 
BaseInputSourceDefn
         convertSource(sourceMap, jsonMapper),
         inputFormat,
         Columns.convertSignature(completedCols),
-        typeValue()
+        () -> Collections.singleton(typeValue())
     );
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
 
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
index 21e6f14eac..67ceedf303 100644
--- 
a/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
+++ 
b/server/src/test/java/org/apache/druid/catalog/model/table/HttpInputSourceDefnTest.java
@@ -398,7 +398,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv";, 
"http://foo.com/bar.csv";)),
         sourceSpec.getUris()
     );
-    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   @Test
@@ -442,7 +442,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/my.csv";, 
"http://foo.com/bar.csv";)),
         sourceSpec.getUris()
     );
-    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   @Test
@@ -466,7 +466,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         
CatalogUtils.stringListToUriList(Arrays.asList("http://foo.com/foo.csv";, 
"http://foo.com/bar.csv";)),
         sourceSpec.getUris()
     );
-    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   @Test
@@ -499,7 +499,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
         "SECRET",
         ((EnvironmentVariablePasswordProvider) 
sourceSpec.getHttpAuthenticationPasswordProvider()).getVariable()
     );
-    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   private void validateHappyPath(ExternalTableSpec externSpec, boolean 
withUser)
@@ -519,7 +519,7 @@ public class HttpInputSourceDefnTest extends 
BaseExternTableTest
     assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
     assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
     assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
-    assertEquals(HttpInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(HttpInputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 
   private Map<String, Object> httpToMap(HttpInputSource source)
diff --git 
a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
 
b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
index daae0bccd0..448e353664 100644
--- 
a/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
+++ 
b/server/src/test/java/org/apache/druid/catalog/model/table/InlineInputSourceDefnTest.java
@@ -144,7 +144,7 @@ public class InlineInputSourceDefnTest extends 
BaseExternTableTest
     CsvInputFormat format = (CsvInputFormat) extern.inputFormat;
     assertEquals(Arrays.asList("a", "b"), format.getColumns());
     assertEquals(2, extern.signature.size());
-    assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
+    assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY), 
extern.inputSourceTypesSupplier.get());
 
     // Fails if no columns are provided.
     assertThrows(IAE.class, () -> fn.apply("x", new HashMap<>(), 
Collections.emptyList(), mapper));
@@ -179,7 +179,7 @@ public class InlineInputSourceDefnTest extends 
BaseExternTableTest
     CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
     assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
     assertEquals(2, extern.signature.size());
-    assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
+    assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY), 
extern.inputSourceTypesSupplier.get());
 
     // Cannot supply columns with the function
     List<ColumnSpec> columns = Arrays.asList(
@@ -215,6 +215,6 @@ public class InlineInputSourceDefnTest extends 
BaseExternTableTest
     CsvInputFormat actualFormat = (CsvInputFormat) extern.inputFormat;
     assertEquals(Arrays.asList("a", "b"), actualFormat.getColumns());
     assertEquals(2, extern.signature.size());
-    assertEquals(InlineInputSourceDefn.TYPE_KEY, extern.inputSourceType);
+    assertEquals(Collections.singleton(InlineInputSourceDefn.TYPE_KEY), 
extern.inputSourceTypesSupplier.get());
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
 
b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
index a71a00f4b9..1bbc74513b 100644
--- 
a/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
+++ 
b/server/src/test/java/org/apache/druid/catalog/model/table/LocalInputSourceDefnTest.java
@@ -536,6 +536,6 @@ public class LocalInputSourceDefnTest extends 
BaseExternTableTest
     assertEquals(Arrays.asList("x", "y"), sig.getColumnNames());
     assertEquals(ColumnType.STRING, sig.getColumnType(0).get());
     assertEquals(ColumnType.LONG, sig.getColumnType(1).get());
-    assertEquals(LocalInputSourceDefn.TYPE_KEY, externSpec.inputSourceType);
+    assertEquals(Collections.singleton(LocalInputSourceDefn.TYPE_KEY), 
externSpec.inputSourceTypesSupplier.get());
   }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
index e8740ba5bb..e74a0fdd86 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/DruidExternTableMacro.java
@@ -20,11 +20,11 @@
 package org.apache.druid.sql.calcite.external;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.util.NlsString;
+import org.apache.druid.data.input.InputSource;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.Resource;
 import org.apache.druid.server.security.ResourceAction;
@@ -34,6 +34,7 @@ import org.apache.druid.sql.calcite.table.DruidTable;
 import javax.validation.constraints.NotNull;
 import java.util.Collections;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Used by {@link ExternalOperatorConversion} to generate a {@link DruidTable}
@@ -55,11 +56,10 @@ public class DruidExternTableMacro extends 
DruidUserDefinedTableMacro
     String inputSourceStr = getInputSourceArgument(call);
 
     try {
-      JsonNode jsonNode = ((DruidTableMacro) 
macro).getJsonMapper().readTree(inputSourceStr);
-      return Collections.singleton(new ResourceAction(new Resource(
-          ResourceType.EXTERNAL,
-          jsonNode.get("type").asText()
-      ), Action.READ));
+      InputSource inputSource = ((DruidTableMacro) 
macro).getJsonMapper().readValue(inputSourceStr, InputSource.class);
+      return inputSource.getTypes().stream()
+          .map(inputSourceType -> new ResourceAction(new 
Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ))
+          .collect(Collectors.toSet());
     }
     catch (JsonProcessingException e) {
       // this shouldn't happen, the input source paraemeter should have been 
validated before this
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
index 6b1b1fb624..f2f667af4e 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/ExternalOperatorConversion.java
@@ -111,12 +111,12 @@ public class ExternalOperatorConversion extends 
DruidExternTableMacroConversion
         }
 
         String inputSrcStr = CatalogUtils.getString(args, INPUT_SOURCE_PARAM);
-        String inputSrcType = 
jsonMapper.readTree(inputSrcStr).get("type").asText();
+        InputSource inputSource = jsonMapper.readValue(inputSrcStr, 
InputSource.class);
         return new ExternalTableSpec(
-            jsonMapper.readValue(inputSrcStr, InputSource.class),
+            inputSource,
             jsonMapper.readValue(CatalogUtils.getString(args, 
INPUT_FORMAT_PARAM), InputFormat.class),
             rowSignature,
-            inputSrcType
+            inputSource::getTypes
         );
       }
       catch (JsonProcessingException e) {
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
index 595c825718..ee0c650242 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/external/Externals.java
@@ -20,6 +20,7 @@
 package org.apache.druid.sql.calcite.external;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.avatica.SqlType;
 import org.apache.calcite.rel.type.RelDataType;
@@ -55,6 +56,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -297,7 +299,7 @@ public class Externals
                     + "Please change the column name to something other than 
__time");
     }
 
-    return toExternalTable(spec, jsonMapper, spec.inputSourceType);
+    return toExternalTable(spec, jsonMapper, spec.inputSourceTypesSupplier);
   }
 
   public static ResourceAction externalRead(String name)
@@ -308,7 +310,7 @@ public class Externals
   public static ExternalTable toExternalTable(
       ExternalTableSpec spec,
       ObjectMapper jsonMapper,
-      String inputSourceType
+      Supplier<Set<String>> inputSourceTypesSupplier
   )
   {
     return new ExternalTable(
@@ -319,7 +321,7 @@ public class Externals
           ),
         spec.signature,
         jsonMapper,
-        inputSourceType
+        inputSourceTypesSupplier
     );
   }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
index b05c0a0cd8..28c6236c09 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/external/SchemaAwareUserDefinedTableMacro.java
@@ -48,6 +48,7 @@ import org.apache.druid.sql.calcite.table.ExternalTable;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Table macro designed for use with the Druid EXTEND operator. Example:
@@ -172,12 +173,14 @@ public abstract class SchemaAwareUserDefinedTableMacro
     {
       Set<ResourceAction> resourceActions = new HashSet<>();
       if (table instanceof ExternalTable && inputSourceTypeSecurityEnabled) {
-        resourceActions.add(new ResourceAction(new Resource(
-            ResourceType.EXTERNAL,
-            ((ExternalTable) table).getInputSourceType()
-        ), Action.READ));
+        resourceActions.addAll(((ExternalTable) table)
+                                   .getInputSourceTypeSupplier().get().stream()
+                                   .map(inputSourceType ->
+                                 new ResourceAction(new 
Resource(ResourceType.EXTERNAL, inputSourceType), Action.READ))
+                                   .collect(Collectors.toSet()));
+      } else {
+        resourceActions.addAll(base.computeResources(call, 
inputSourceTypeSecurityEnabled));
       }
-      resourceActions.addAll(base.computeResources(call, 
inputSourceTypeSecurityEnabled));
       return resourceActions;
     }
   }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
index 3c6fbdd0ac..bda5b43625 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/ExternalTable.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.table;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptTable.ToRelContext;
 import org.apache.calcite.rel.RelNode;
@@ -30,6 +31,8 @@ import org.apache.druid.query.DataSource;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.external.ExternalTableScan;
 
+import java.util.Set;
+
 /**
  * Represents an source of data external to Druid: a CSV file, an HTTP 
request, etc.
  * Each such table represents one of Druid's {@link DataSource} types. Since 
SQL
@@ -42,7 +45,7 @@ public class ExternalTable extends DruidTable
   private final DataSource dataSource;
   private final ObjectMapper objectMapper;
 
-  private final String inputSourceType;
+  private final Supplier<Set<String>> inputSourceTypeSupplier;
 
   /**
    * Cached row type, to avoid recreating types multiple times.
@@ -53,13 +56,13 @@ public class ExternalTable extends DruidTable
       final DataSource dataSource,
       final RowSignature rowSignature,
       final ObjectMapper objectMapper,
-      final String inputSourceType
+      final Supplier<Set<String>> inputSourceTypesSupplier
   )
   {
     super(rowSignature);
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.objectMapper = objectMapper;
-    this.inputSourceType = inputSourceType;
+    this.inputSourceTypeSupplier = inputSourceTypesSupplier;
   }
 
   @Override
@@ -93,9 +96,9 @@ public class ExternalTable extends DruidTable
     return rowType;
   }
 
-  public String getInputSourceType()
+  public Supplier<Set<String>> getInputSourceTypeSupplier()
   {
-    return inputSourceType;
+    return inputSourceTypeSupplier;
   }
 
   @Override
@@ -110,7 +113,6 @@ public class ExternalTable extends DruidTable
     return "ExternalTable{" +
            "dataSource=" + dataSource +
            ", rowSignature=" + getRowSignature() +
-           ", inputSourceType=" + getInputSourceType() +
            '}';
   }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index ea85780bd7..43eb443045 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -992,7 +992,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
    * factory is specific to one test and one planner config. This method can be
    * overridden to control the objects passed to the factory.
    */
-  private SqlStatementFactory getSqlStatementFactory(
+  SqlStatementFactory getSqlStatementFactory(
       PlannerConfig plannerConfig,
       AuthConfig authConfig
   )
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
index 50d0bc445b..75d3fa2432 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteIngestionDmlTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.sql.calcite;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -27,8 +29,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Binder;
+import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.InlineInputSource;
+import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.guice.DruidInjectorBuilder;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.ISE;
@@ -62,10 +69,15 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
 
 public class CalciteIngestionDmlTest extends BaseCalciteQueryTest
 {
@@ -143,7 +155,9 @@ public class CalciteIngestionDmlTest extends 
BaseCalciteQueryTest
       public List<? extends Module> getJacksonModules()
       {
         // We want this module to bring input sources along for the ride.
-        return new InputSourceModule().getJacksonModules();
+        List<Module> modules = new ArrayList<>(new 
InputSourceModule().getJacksonModules());
+        modules.add(new 
SimpleModule("test-module").registerSubtypes(TestFileInputSource.class));
+        return modules;
       }
 
       @Override
@@ -365,7 +379,7 @@ public class CalciteIngestionDmlTest extends 
BaseCalciteQueryTest
       final Throwable e = Assert.assertThrows(
           Throwable.class,
           () -> {
-            
getSqlStatementFactory(plannerConfig).directStatement(sqlQuery()).execute();
+            getSqlStatementFactory(plannerConfig, 
authConfig).directStatement(sqlQuery()).execute();
           }
       );
 
@@ -423,4 +437,64 @@ public class CalciteIngestionDmlTest extends 
BaseCalciteQueryTest
           .build();
     }
   }
+
+  static class TestFileInputSource extends AbstractInputSource implements 
SplittableInputSource<File>
+  {
+    private final List<File> files;
+
+    @JsonCreator
+    TestFileInputSource(@JsonProperty("files") List<File> fileList)
+    {
+      files = fileList;
+    }
+
+    @JsonProperty
+    public List<File> getFiles()
+    {
+      return files;
+    }
+
+    @Override
+    public Stream<InputSplit<File>> createSplits(InputFormat inputFormat, 
@Nullable SplitHintSpec splitHintSpec)
+    {
+      return files.stream().map(InputSplit::new);
+    }
+
+    @Override
+    public int estimateNumSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
+    {
+      return files.size();
+    }
+
+    @Override
+    public SplittableInputSource<File> withSplit(InputSplit<File> split)
+    {
+      return new TestFileInputSource(ImmutableList.of(split.get()));
+    }
+
+    @Override
+    public boolean needsFormat()
+    {
+      return true;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TestFileInputSource that = (TestFileInputSource) o;
+      return Objects.equals(files, that.files);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(files);
+    }
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index d9b6b6e32f..fb184e89c8 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.java.util.common.StringUtils;
@@ -55,6 +56,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -473,6 +475,97 @@ public class CalciteInsertDmlTest extends 
CalciteIngestionDmlTest
         .verify();
   }
 
+  @Test
+  public void testInsertFromExternalWithoutSecuritySupport()
+  {
+    InputSource inputSource =
+        new TestFileInputSource(ImmutableList.of(new 
File("/tmp/foo.csv").getAbsoluteFile()));
+    final ExternalDataSource externalDataSource = new ExternalDataSource(
+        inputSource,
+        new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, 
false, 0),
+        RowSignature.builder()
+                    .add("x", ColumnType.STRING)
+                    .add("y", ColumnType.STRING)
+                    .add("z", ColumnType.LONG)
+                    .build()
+    );
+    String extern;
+    ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+    try {
+      extern = StringUtils.format(
+          "TABLE(extern("
+          + "inputSource => '%s',"
+          + "inputFormat => '%s'))",
+          queryJsonMapper.writeValueAsString(inputSource),
+          queryJsonMapper.writeValueAsString(
+              new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, 
false, 0)
+          )
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s\n" +
+             "  (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+             "PARTITIONED BY ALL TIME",
+             extern
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        
.authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(false).build())
+        .expectTarget("dst", externalDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), 
Externals.EXTERNAL_RESOURCE_ACTION)
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(externalDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                .context(PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .expectLogicalPlanFrom("InsertFromExternalWithoutSecuritySupport")
+        .verify();
+  }
+
+  @Test
+  public void 
testInsertFromExternalWithoutSecuritySupportWithInputsourceSecurityEnabled()
+  {
+    String extern;
+    ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
+    try {
+      extern = StringUtils.format(
+          "TABLE(extern("
+          + "inputSource => '%s',"
+          + "inputFormat => '%s'))",
+          queryJsonMapper.writeValueAsString(
+              new TestFileInputSource(ImmutableList.of(new 
File("/tmp/foo.csv").getAbsoluteFile()))),
+          queryJsonMapper.writeValueAsString(
+              new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, 
false, 0)
+          )
+      );
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s\n" +
+             "  (x VARCHAR, y VARCHAR, z BIGINT)\n" +
+             "PARTITIONED BY ALL TIME",
+             extern
+        )
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        
.authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+        .expectLogicalPlanFrom("insertFromExternal")
+        .expectValidationError(
+            CoreMatchers.allOf(
+                CoreMatchers.instanceOf(SqlPlanningException.class),
+                ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+                    "org.apache.druid.java.util.common.UOE: This inputSource 
does not support input source based security"))
+            )
+        )
+        .verify();
+  }
+
   @Test
   public void testInsertWithPartitionedBy()
   {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
index 83bb87436b..86136c7000 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestTableFunctionTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.metadata.DefaultPasswordProvider;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.sql.calcite.external.ExternalDataSource;
 import org.apache.druid.sql.calcite.external.Externals;
@@ -110,6 +111,65 @@ public class IngestTableFunctionTest extends 
CalciteIngestionDmlTest
         .verify();
   }
 
+  /**
+   * Http function
+   */
+  @Test
+  public void testHttpFunction()
+  {
+    String extern = "TABLE(http("
+             + "userName => 'bob',"
+             + "password => 'secret',"
+             + "uris => ARRAY['http://foo.com/bar.csv'],"
+             + "format => 'csv'))"
+             + "  (x VARCHAR, y VARCHAR, z BIGINT)";
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", 
extern)
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("dst", httpDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), 
Externals.EXTERNAL_RESOURCE_ACTION)
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(httpDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .expectLogicalPlanFrom("httpExtern")
+        .verify();
+  }
+
+  /**
+   * Http function
+   */
+  @Test
+  public void testHttpFunctionWithInputsourceSecurity()
+  {
+    String extern = "TABLE(http("
+                    + "userName => 'bob',"
+                    + "password => 'secret',"
+                    + "uris => ARRAY['http://foo.com/bar.csv'],"
+                    + "format => 'csv'))"
+                    + "  (x VARCHAR, y VARCHAR, z BIGINT)";
+    testIngestionQuery()
+        .sql("INSERT INTO dst SELECT * FROM %s PARTITIONED BY ALL TIME", 
extern)
+        
.authConfig(AuthConfig.newBuilder().setEnableInputSourceSecurity(true).build())
+        .authentication(CalciteTests.SUPER_USER_AUTH_RESULT)
+        .expectTarget("dst", httpDataSource.getSignature())
+        .expectResources(dataSourceWrite("dst"), externalRead("http"))
+        .expectQuery(
+            newScanQueryBuilder()
+                .dataSource(httpDataSource)
+                .intervals(querySegmentSpec(Filtration.eternity()))
+                .columns("x", "y", "z")
+                
.context(CalciteIngestionDmlTest.PARTITIONED_BY_ALL_TIME_QUERY_CONTEXT)
+                .build()
+        )
+        .expectLogicalPlanFrom("httpExtern")
+        .verify();
+  }
+
   protected String externSqlByName(final ExternalDataSource externalDataSource)
   {
     ObjectMapper queryJsonMapper = queryFramework().queryJsonMapper();
diff --git 
a/sql/src/test/resources/calcite/expected/ingest/InsertFromExternalWithoutSecuritySupport-logicalPlan.txt
 
b/sql/src/test/resources/calcite/expected/ingest/InsertFromExternalWithoutSecuritySupport-logicalPlan.txt
new file mode 100644
index 0000000000..272f660d21
--- /dev/null
+++ 
b/sql/src/test/resources/calcite/expected/ingest/InsertFromExternalWithoutSecuritySupport-logicalPlan.txt
@@ -0,0 +1,3 @@
+LogicalInsert(target=[dst], partitionedBy=[AllGranularity], 
clusteredBy=[<none>])
+  LogicalProject(x=[$0], y=[$1], z=[$2])
+    
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"CalciteIngestionDmlTest$TestFileInputSource","files":["/tmp/foo.csv"]},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to